You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/17 14:51:07 UTC

[5/7] cassandra git commit: Introduce Transactional API for internal state changes

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
index 087e57a..fa17c20 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java
@@ -47,8 +47,10 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.FilterFactory;
 import org.apache.cassandra.utils.IFilter;
-import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.StreamingHistogram;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static org.apache.cassandra.utils.Throwables.merge;
 
 public class BigTableWriter extends SSTableWriter
 {
@@ -57,7 +59,7 @@ public class BigTableWriter extends SSTableWriter
     // not very random, but the only value that can't be mistaken for a legal column-name length
     public static final int END_OF_ROW = 0x0000;
 
-    private IndexWriter iwriter;
+    private final IndexWriter iwriter;
     private SegmentedFile.Builder dbuilder;
     private final SequentialWriter dataFile;
     private DecoratedKey lastWrittenKey;
@@ -270,47 +272,6 @@ public class BigTableWriter extends SSTableWriter
         return currentPosition;
     }
 
-    /**
-     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
-     */
-    public void abort()
-    {
-        assert descriptor.type.isTemporary;
-        if (iwriter == null && dataFile == null)
-            return;
-
-        if (iwriter != null)
-            iwriter.abort();
-
-        if (dataFile!= null)
-            dataFile.abort();
-
-        if (dbuilder != null)
-            dbuilder.close();
-
-        Set<Component> components = SSTable.componentsFor(descriptor);
-        try
-        {
-            if (!components.isEmpty())
-                SSTable.delete(descriptor, components);
-        }
-        catch (FSWriteError e)
-        {
-            logger.error(String.format("Failed deleting temp components for %s", descriptor), e);
-            throw e;
-        }
-    }
-
-    // we use this method to ensure any managed data we may have retained references to during the write are no
-    // longer referenced, so that we do not need to enclose the expensive call to closeAndOpenReader() in a transaction
-    public void isolateReferences()
-    {
-        // currently we only maintain references to first/last/lastWrittenKey from the data provided; all other
-        // data retention is done through copying
-        first = getMinimalKey(first);
-        last = lastWrittenKey = getMinimalKey(last);
-    }
-
     private Descriptor makeTmpLinks()
     {
         // create temp links if they don't already exist
@@ -323,17 +284,14 @@ public class BigTableWriter extends SSTableWriter
         return link;
     }
 
-    public SSTableReader openEarly(long maxDataAge)
+    public SSTableReader openEarly()
     {
-        StatsMetadata sstableMetadata = (StatsMetadata) metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
-                                                  metadata.getBloomFilterFpChance(),
-                                                  repairedAt).get(MetadataType.STATS);
-
         // find the max (exclusive) readable key
         IndexSummaryBuilder.ReadableBoundary boundary = iwriter.getMaxReadable();
         if (boundary == null)
             return null;
 
+        StatsMetadata stats = statsMetadata();
         assert boundary.indexLength > 0 && boundary.dataLength > 0;
         Descriptor link = makeTmpLinks();
         // open the reader early, giving it a FINAL descriptor type so that it is indistinguishable for other consumers
@@ -343,7 +301,7 @@ public class BigTableWriter extends SSTableWriter
                                                            components, metadata,
                                                            partitioner, ifile,
                                                            dfile, iwriter.summary.build(partitioner, boundary),
-                                                           iwriter.bf.sharedCopy(), maxDataAge, sstableMetadata, SSTableReader.OpenReason.EARLY);
+                                                           iwriter.bf.sharedCopy(), maxDataAge, stats, SSTableReader.OpenReason.EARLY);
 
         // now it's open, find the ACTUAL last readable key (i.e. for which the data file has also been flushed)
         sstable.first = getMinimalKey(first);
@@ -351,31 +309,23 @@ public class BigTableWriter extends SSTableWriter
         return sstable;
     }
 
-    public SSTableReader closeAndOpenReader()
+    public SSTableReader openFinalEarly()
     {
-        return closeAndOpenReader(System.currentTimeMillis());
+        // we must ensure the data is completely flushed to disk
+        dataFile.sync();
+        iwriter.indexFile.sync();
+        return openFinal(makeTmpLinks(), SSTableReader.OpenReason.EARLY);
     }
 
-    public SSTableReader closeAndOpenReader(long maxDataAge)
+    private SSTableReader openFinal(Descriptor desc, SSTableReader.OpenReason openReason)
     {
-        return finish(FinishType.NORMAL, maxDataAge, this.repairedAt);
-    }
-
-    public SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt)
-    {
-        assert finishType != FinishType.CLOSE;
-        Pair<Descriptor, StatsMetadata> p;
-
-        p = close(finishType, repairedAt < 0 ? this.repairedAt : repairedAt);
-        Descriptor desc = p.left;
-        StatsMetadata metadata = p.right;
-
-        if (finishType == FinishType.EARLY)
-            desc = makeTmpLinks();
+        if (maxDataAge < 0)
+            maxDataAge = System.currentTimeMillis();
 
+        StatsMetadata stats = statsMetadata();
         // finalize in-memory state for the reader
-        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX), finishType.isFinal);
-        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA), finishType.isFinal);
+        SegmentedFile ifile = iwriter.builder.complete(desc.filenameFor(Component.PRIMARY_INDEX));
+        SegmentedFile dfile = dbuilder.complete(desc.filenameFor(Component.DATA));
         SSTableReader sstable = SSTableReader.internalOpen(desc.asType(Descriptor.Type.FINAL),
                                                            components,
                                                            this.metadata,
@@ -385,81 +335,93 @@ public class BigTableWriter extends SSTableWriter
                                                            iwriter.summary.build(partitioner),
                                                            iwriter.bf.sharedCopy(),
                                                            maxDataAge,
-                                                           metadata,
-                                                           finishType.openReason);
+                                                           stats,
+                                                           openReason);
         sstable.first = getMinimalKey(first);
         sstable.last = getMinimalKey(last);
-
-        if (finishType.isFinal)
-        {
-            iwriter.bf.close();
-            iwriter.summary.close();
-            // try to save the summaries to disk
-            sstable.saveSummary(iwriter.builder, dbuilder);
-            iwriter.builder.close();
-            iwriter = null;
-            dbuilder.close();
-            dbuilder = null;
-        }
         return sstable;
     }
 
-    // Close the writer and return the descriptor to the new sstable and it's metadata
-    public Pair<Descriptor, StatsMetadata> close()
+    protected SSTableWriter.TransactionalProxy txnProxy()
     {
-        Pair<Descriptor, StatsMetadata> ret = close(FinishType.CLOSE, this.repairedAt);
-        if (dbuilder != null)
-            dbuilder.close();
-        if (iwriter != null)
-            iwriter.builder.close();
-        return ret;
+        return new TransactionalProxy();
     }
 
-    private Pair<Descriptor, StatsMetadata> close(FinishType type, long repairedAt)
+    class TransactionalProxy extends SSTableWriter.TransactionalProxy
     {
-        switch (type)
+        // finalise our state on disk, including renaming
+        protected void doPrepare()
         {
-            case EARLY: case CLOSE: case NORMAL:
-            iwriter.close();
-            dataFile.close();
-            if (type == FinishType.CLOSE)
-                iwriter.bf.close();
-        }
+            Map<MetadataType, MetadataComponent> metadataComponents = finalizeMetadata();
 
-        // write sstable statistics
-        Map<MetadataType, MetadataComponent> metadataComponents;
-        metadataComponents = metadataCollector
-                             .finalizeMetadata(partitioner.getClass().getCanonicalName(),
-                                               metadata.getBloomFilterFpChance(),repairedAt);
+            iwriter.prepareToCommit();
 
-        // remove the 'tmp' marker from all components
-        Descriptor descriptor = this.descriptor;
-        if (type.isFinal)
-        {
-            dataFile.writeFullChecksum(descriptor);
+            // write sstable statistics
+            dataFile.setDescriptor(descriptor).prepareToCommit();
             writeMetadata(descriptor, metadataComponents);
+
             // save the table of components
             SSTable.appendTOC(descriptor, components);
-            descriptor = rename(descriptor, components);
+
+            // rename to final
+            rename(descriptor, components);
+
+            if (openResult)
+                finalReader = openFinal(descriptor.asType(Descriptor.Type.FINAL), SSTableReader.OpenReason.NORMAL);
         }
 
-        return Pair.create(descriptor, (StatsMetadata) metadataComponents.get(MetadataType.STATS));
+        protected Throwable doCommit(Throwable accumulate)
+        {
+            accumulate = dataFile.commit(accumulate);
+            accumulate = iwriter.commit(accumulate);
+            return accumulate;
+        }
+
+        protected Throwable doCleanup(Throwable accumulate)
+        {
+            accumulate = dbuilder.close(accumulate);
+            return accumulate;
+        }
+
+        protected Throwable doAbort(Throwable accumulate)
+        {
+            accumulate = iwriter.abort(accumulate);
+            accumulate = dataFile.abort(accumulate);
+
+            accumulate = delete(descriptor, accumulate);
+            if (!openResult)
+                accumulate = delete(descriptor.asType(Descriptor.Type.FINAL), accumulate);
+            return accumulate;
+        }
+
+        private Throwable delete(Descriptor desc, Throwable accumulate)
+        {
+            try
+            {
+                Set<Component> components = SSTable.discoverComponentsFor(desc);
+                if (!components.isEmpty())
+                    SSTable.delete(desc, components);
+            }
+            catch (Throwable t)
+            {
+                logger.error(String.format("Failed deleting temp components for %s", descriptor), t);
+                accumulate = merge(accumulate, t);
+            }
+            return accumulate;
+        }
     }
 
     private static void writeMetadata(Descriptor desc, Map<MetadataType, MetadataComponent> components)
     {
-        SequentialWriter out = SequentialWriter.open(new File(desc.filenameFor(Component.STATS)));
-        try
+        File file = new File(desc.filenameFor(Component.STATS));
+        try (SequentialWriter out = SequentialWriter.open(file);)
         {
             desc.getMetadataSerializer().serialize(components, out.stream);
+            out.setDescriptor(desc).finish();
         }
         catch (IOException e)
         {
-            throw new FSWriteError(e, out.getPath());
-        }
-        finally
-        {
-            out.close();
+            throw new FSWriteError(e, file.getPath());
         }
     }
 
@@ -476,7 +438,7 @@ public class BigTableWriter extends SSTableWriter
     /**
      * Encapsulates writing the index and filter for an SSTable. The state of this object is not valid until it has been closed.
      */
-    class IndexWriter
+    class IndexWriter extends AbstractTransactional implements Transactional
     {
         private final SequentialWriter indexFile;
         public final SegmentedFile.Builder builder;
@@ -535,18 +497,10 @@ public class BigTableWriter extends SSTableWriter
             builder.addPotentialBoundary(indexStart);
         }
 
-        public void abort()
-        {
-            summary.close();
-            indexFile.abort();
-            bf.close();
-            builder.close();
-        }
-
         /**
          * Closes the index and bloomfilter, making the public state of this writer valid for consumption.
          */
-        public void close()
+        void flushBf()
         {
             if (components.contains(Component.FILTER))
             {
@@ -566,11 +520,6 @@ public class BigTableWriter extends SSTableWriter
                     throw new FSWriteError(e, path);
                 }
             }
-
-            // index
-            long position = indexFile.getFilePointer();
-            indexFile.close(); // calls force
-            FileUtils.truncate(indexFile.getPath(), position);
         }
 
         public void mark()
@@ -585,5 +534,40 @@ public class BigTableWriter extends SSTableWriter
             // we assume that if that worked then we won't be trying to reset.
             indexFile.resetAndTruncate(mark);
         }
+
+        protected void doPrepare()
+        {
+            flushBf();
+
+            // truncate index file
+            long position = iwriter.indexFile.getFilePointer();
+            iwriter.indexFile.setDescriptor(descriptor).prepareToCommit();
+            FileUtils.truncate(iwriter.indexFile.getPath(), position);
+
+            // save summary
+            summary.prepareToCommit();
+            try (IndexSummary summary = iwriter.summary.build(partitioner))
+            {
+                SSTableReader.saveSummary(descriptor, first, last, iwriter.builder, dbuilder, summary);
+            }
+        }
+
+        protected Throwable doCommit(Throwable accumulate)
+        {
+            return indexFile.commit(accumulate);
+        }
+
+        protected Throwable doAbort(Throwable accumulate)
+        {
+            return indexFile.abort(accumulate);
+        }
+
+        protected Throwable doCleanup(Throwable accumulate)
+        {
+            accumulate = summary.close(accumulate);
+            accumulate = bf.close(accumulate);
+            accumulate = builder.close(accumulate);
+            return accumulate;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
index e8b719e..b623e54 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedPoolingSegmentedFile.java
@@ -41,9 +41,8 @@ public class BufferedPoolingSegmentedFile extends PoolingSegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+        public SegmentedFile complete(ChannelProxy channel, long overrideLength)
         {
-            assert !isFinal || overrideLength <= 0;
             long length = overrideLength > 0 ? overrideLength : channel.size();
             return new BufferedPoolingSegmentedFile(channel, length);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
index b4d966a..2c59def 100644
--- a/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/BufferedSegmentedFile.java
@@ -48,9 +48,8 @@ public class BufferedSegmentedFile extends SegmentedFile
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+        public SegmentedFile complete(ChannelProxy channel, long overrideLength)
         {
-            assert !isFinal || overrideLength <= 0;
             long length = overrideLength > 0 ? overrideLength : channel.size();
             return new BufferedSegmentedFile(channel, length);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
index d28a14d..ec68c2d 100644
--- a/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/ChecksummedSequentialWriter.java
@@ -20,8 +20,6 @@ package org.apache.cassandra.io.util;
 import java.io.File;
 import java.nio.ByteBuffer;
 
-import org.apache.cassandra.io.sstable.Descriptor;
-
 public class ChecksummedSequentialWriter extends SequentialWriter
 {
     private final SequentialWriter crcWriter;
@@ -44,20 +42,36 @@ public class ChecksummedSequentialWriter extends SequentialWriter
         crcMetadata.appendDirect(toAppend, false);
     }
 
-    public void writeFullChecksum(Descriptor descriptor)
+    protected class TransactionalProxy extends SequentialWriter.TransactionalProxy
     {
-        crcMetadata.writeFullChecksum(descriptor);
-    }
+        @Override
+        protected Throwable doCommit(Throwable accumulate)
+        {
+            return crcWriter.commit(accumulate);
+        }
 
-    public void close()
-    {
-        super.close();
-        crcWriter.close();
+        @Override
+        protected Throwable doAbort(Throwable accumulate)
+        {
+            return super.doAbort(crcWriter.abort(accumulate));
+        }
+
+        @Override
+        protected void doPrepare()
+        {
+            syncInternal();
+            if (descriptor != null)
+                crcMetadata.writeFullChecksum(descriptor);
+            crcWriter.setDescriptor(descriptor).prepareToCommit();
+            // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
+            // TODO: once we stop file renaming, remove this for clarity
+            releaseFileHandle();
+        }
     }
 
-    public void abort()
+    @Override
+    protected SequentialWriter.TransactionalProxy txnProxy()
     {
-        super.abort();
-        crcWriter.abort();
+        return new TransactionalProxy();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
index cb30131..fdc4f61 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedPoolingSegmentedFile.java
@@ -96,9 +96,9 @@ public class CompressedPoolingSegmentedFile extends PoolingSegmentedFile impleme
             // only one segment in a standard-io file
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+        public SegmentedFile complete(ChannelProxy channel, long overrideLength)
         {
-            return new CompressedPoolingSegmentedFile(channel, metadata(channel.filePath(), overrideLength, isFinal));
+            return new CompressedPoolingSegmentedFile(channel, metadata(channel.filePath(), overrideLength));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
index caf4c22..ceff7ba 100644
--- a/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/CompressedSegmentedFile.java
@@ -136,18 +136,17 @@ public class CompressedSegmentedFile extends SegmentedFile implements ICompresse
             // only one segment in a standard-io file
         }
 
-        protected CompressionMetadata metadata(String path, long overrideLength, boolean isFinal)
+        protected CompressionMetadata metadata(String path, long overrideLength)
         {
             if (writer == null)
                 return CompressionMetadata.create(path);
 
-            return writer.open(overrideLength, isFinal);
+            return writer.open(overrideLength);
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+        public SegmentedFile complete(ChannelProxy channel, long overrideLength)
         {
-            assert !isFinal || overrideLength <= 0;
-            return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength, isFinal));
+            return new CompressedSegmentedFile(channel, metadata(channel.filePath(), overrideLength));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/FileUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/FileUtils.java b/src/java/org/apache/cassandra/io/util/FileUtils.java
index 8007039..2566952 100644
--- a/src/java/org/apache/cassandra/io/util/FileUtils.java
+++ b/src/java/org/apache/cassandra/io/util/FileUtils.java
@@ -40,6 +40,9 @@ import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.JVMStabilityInspector;
 
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
 public class FileUtils
 {
     private static final Logger logger = LoggerFactory.getLogger(FileUtils.class);
@@ -107,24 +110,42 @@ public class FileUtils
         return createTempFile(prefix, suffix, new File(System.getProperty("java.io.tmpdir")));
     }
 
-    public static void deleteWithConfirm(String file)
+    public static Throwable deleteWithConfirm(String filePath, boolean expect, Throwable accumulate)
     {
-        deleteWithConfirm(new File(file));
+        return deleteWithConfirm(new File(filePath), expect, accumulate);
     }
 
-    public static void deleteWithConfirm(File file)
+    public static Throwable deleteWithConfirm(File file, boolean expect, Throwable accumulate)
     {
-        assert file.exists() : "attempted to delete non-existing file " + file.getName();
-        if (logger.isDebugEnabled())
-            logger.debug("Deleting {}", file.getName());
+        boolean exists = file.exists();
+        assert exists || !expect : "attempted to delete non-existing file " + file.getName();
         try
         {
-            Files.delete(file.toPath());
+            if (exists)
+                Files.delete(file.toPath());
         }
-        catch (IOException e)
+        catch (Throwable t)
         {
-            throw new FSWriteError(e, file);
+            try
+            {
+                throw new FSWriteError(t, file);
+            }
+            catch (Throwable t2)
+            {
+                accumulate = merge(accumulate, t2);
+            }
         }
+        return accumulate;
+    }
+
+    public static void deleteWithConfirm(String file)
+    {
+        deleteWithConfirm(new File(file));
+    }
+
+    public static void deleteWithConfirm(File file)
+    {
+        maybeFail(deleteWithConfirm(file, true, null));
     }
 
     public static void renameWithOutConfirm(String from, String to)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
index 57295fe..91908c9 100644
--- a/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/MmappedSegmentedFile.java
@@ -183,9 +183,8 @@ public class MmappedSegmentedFile extends SegmentedFile
             }
         }
 
-        public SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal)
+        public SegmentedFile complete(ChannelProxy channel, long overrideLength)
         {
-            assert !isFinal || overrideLength <= 0;
             long length = overrideLength > 0 ? overrideLength : channel.size();
             // create the segments
             return new MmappedSegmentedFile(channel, length, createSegments(channel, length));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SafeMemory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemory.java b/src/java/org/apache/cassandra/io/util/SafeMemory.java
index f96afcc..ad11472 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemory.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemory.java
@@ -62,6 +62,11 @@ public class SafeMemory extends Memory implements SharedCloseable
         peer = 0;
     }
 
+    public Throwable close(Throwable accumulate)
+    {
+        return ref.ensureReleased(accumulate);
+    }
+
     public SafeMemory copy(long newSize)
     {
         SafeMemory copy = new SafeMemory(newSize);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
index 1fc374f..1096b5f 100644
--- a/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SafeMemoryWriter.java
@@ -71,6 +71,11 @@ public class SafeMemoryWriter extends DataOutputBuffer
         memory.close();
     }
 
+    public Throwable close(Throwable accumulate)
+    {
+        return memory.close(accumulate);
+    }
+
     public long length()
     {
         return tailOffset(memory) +  buffer.position();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SegmentedFile.java b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
index cb4d132..edbd742 100644
--- a/src/java/org/apache/cassandra/io/util/SegmentedFile.java
+++ b/src/java/org/apache/cassandra/io/util/SegmentedFile.java
@@ -37,6 +37,8 @@ import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.RefCounted;
 import org.apache.cassandra.utils.concurrent.SharedCloseableImpl;
 
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
 /**
  * Abstracts a read-only file that has been split into segments, each of which can be represented by an independent
  * FileDataInput. Allows for iteration over the FileDataInputs, or random access to the FileDataInput for a given
@@ -169,21 +171,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl
          * Called after all potential boundaries have been added to apply this Builder to a concrete file on disk.
          * @param channel The channel to the file on disk.
          */
-        protected abstract SegmentedFile complete(ChannelProxy channel, long overrideLength, boolean isFinal);
+        protected abstract SegmentedFile complete(ChannelProxy channel, long overrideLength);
 
         public SegmentedFile complete(String path)
         {
-            return complete(getChannel(path), -1, true);
-        }
-
-        public SegmentedFile complete(String path, boolean isFinal)
-        {
-            return complete(getChannel(path), -1, isFinal);
+            return complete(getChannel(path), -1);
         }
 
         public SegmentedFile complete(String path, long overrideLength)
         {
-            return complete(getChannel(path), overrideLength, false);
+            return complete(getChannel(path), overrideLength);
         }
 
         public void serializeBounds(DataOutput out) throws IOException
@@ -197,10 +194,16 @@ public abstract class SegmentedFile extends SharedCloseableImpl
                 throw new IOException("Cannot deserialize SSTable Summary component because the DiskAccessMode was changed!");
         }
 
-        public void close()
+        public Throwable close(Throwable accumulate)
         {
             if (channel != null)
-                channel.close();
+                return channel.close(accumulate);
+            return accumulate;
+        }
+
+        public void close()
+        {
+            maybeFail(close(null));
         }
 
         private ChannelProxy getChannel(String path)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
index c4fef07..d63be31 100644
--- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java
@@ -24,8 +24,6 @@ import java.nio.channels.FileChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.StandardOpenOption;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
@@ -34,15 +32,16 @@ import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.utils.CLibrary;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+import static org.apache.cassandra.utils.Throwables.merge;
 
 /**
  * Adds buffering, mark, and fsyncing to OutputStream.  We always fsync on close; we may also
  * fsync incrementally if Config.trickle_fsync is enabled.
  */
-public class SequentialWriter extends OutputStream implements WritableByteChannel
+public class SequentialWriter extends OutputStream implements WritableByteChannel, Transactional
 {
-    private static final Logger logger = LoggerFactory.getLogger(SequentialWriter.class);
-
     // isDirty - true if this.buffer contains any un-synced bytes
     protected boolean isDirty = false, syncNeeded = false;
 
@@ -71,6 +70,55 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
 
     protected Runnable runPostFlush;
 
+    private final TransactionalProxy txnProxy = txnProxy();
+    protected Descriptor descriptor;
+
+    // due to lack of multiple-inheritance, we proxy our transactional implementation
+    protected class TransactionalProxy extends AbstractTransactional
+    {
+        @Override
+        protected Throwable doCleanup(Throwable accumulate)
+        {
+            if (directoryFD >= 0)
+            {
+                try { CLibrary.tryCloseFD(directoryFD); }
+                catch (Throwable t) { accumulate = merge(accumulate, t); }
+                directoryFD = -1;
+            }
+
+            // close is idempotent
+            try { channel.close(); }
+            catch (Throwable t) { accumulate = merge(accumulate, t); }
+
+            if (buffer != null)
+            {
+                try { FileUtils.clean(buffer); }
+                catch (Throwable t) { accumulate = merge(accumulate, t); }
+                buffer = null;
+            }
+
+            return accumulate;
+        }
+
+        protected void doPrepare()
+        {
+            syncInternal();
+            // we must cleanup our file handles during prepareCommit for Windows compatibility as we cannot rename an open file;
+            // TODO: once we stop file renaming, remove this for clarity
+            releaseFileHandle();
+        }
+
+        protected Throwable doCommit(Throwable accumulate)
+        {
+            return accumulate;
+        }
+
+        protected Throwable doAbort(Throwable accumulate)
+        {
+            return FileUtils.deleteWithConfirm(filePath, false, accumulate);
+        }
+    }
+
     public SequentialWriter(File file, int bufferSize, boolean offheap)
     {
         try
@@ -383,49 +431,53 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne
         return channel.isOpen();
     }
 
-    @Override
-    public void close()
+    public SequentialWriter setDescriptor(Descriptor descriptor)
     {
-        if (buffer == null)
-            return; // already closed
-
-        syncInternal();
+        this.descriptor = descriptor;
+        return this;
+    }
 
-        buffer = null;
+    public final void prepareToCommit()
+    {
+        txnProxy.prepareToCommit();
+    }
 
-        cleanup(true);
+    public final Throwable commit(Throwable accumulate)
+    {
+        return txnProxy.commit(accumulate);
     }
 
-    public void abort()
+    public final Throwable abort(Throwable accumulate)
     {
-        cleanup(false);
+        return txnProxy.abort(accumulate);
     }
 
-    private void cleanup(boolean throwExceptions)
+    @Override
+    public final void close()
     {
-        if (directoryFD >= 0)
-        {
-            try { CLibrary.tryCloseFD(directoryFD); }
-            catch (Throwable t) { handle(t, throwExceptions); }
-            directoryFD = -1;
-        }
+        txnProxy.close();
+    }
 
-        // close is idempotent
-        try { channel.close(); }
-        catch (Throwable t) { handle(t, throwExceptions); }
+    public final void finish()
+    {
+        txnProxy.finish();
     }
 
-    private void handle(Throwable t, boolean throwExceptions)
+    protected TransactionalProxy txnProxy()
     {
-        if (!throwExceptions)
-            logger.warn("Suppressing exception thrown while aborting writer", t);
-        else
-            throw new FSWriteError(t, getPath());
+        return new TransactionalProxy();
     }
 
-    // hack to make life easier for subclasses
-    public void writeFullChecksum(Descriptor descriptor)
+    public void releaseFileHandle()
     {
+        try
+        {
+            channel.close();
+        }
+        catch (IOException e)
+        {
+            throw new FSWriteError(e, filePath);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
index 1049d43..d4d49b3 100644
--- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
+++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java
@@ -122,7 +122,7 @@ public class StreamReceiveTask extends StreamTask
             lockfile.create(task.sstables);
             List<SSTableReader> readers = new ArrayList<>();
             for (SSTableWriter writer : task.sstables)
-                readers.add(writer.closeAndOpenReader());
+                readers.add(writer.finish(true));
             lockfile.delete();
             task.sstables.clear();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/tools/SSTableImport.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/SSTableImport.java b/src/java/org/apache/cassandra/tools/SSTableImport.java
index d3a2683..7b187ac 100644
--- a/src/java/org/apache/cassandra/tools/SSTableImport.java
+++ b/src/java/org/apache/cassandra/tools/SSTableImport.java
@@ -71,7 +71,7 @@ public class SSTableImport
     private final boolean isSorted;
 
     private static final JsonFactory factory = new MappingJsonFactory().configure(
-            JsonParser.Feature.INTERN_FIELD_NAMES, false);
+                                                                                 JsonParser.Feature.INTERN_FIELD_NAMES, false);
 
     static
     {
@@ -143,10 +143,10 @@ public class SSTableImport
                 else
                 {
                     assert meta.isCQL3Table() || name.hasRemaining() : "Cell name should not be empty";
-                    value = stringAsType((String) fields.get(1), 
-                            meta.getValueValidator(name.hasRemaining() 
-                                    ? comparator.cellFromByteBuffer(name)
-                                    : meta.comparator.rowMarker(Composites.EMPTY)));
+                    value = stringAsType((String) fields.get(1),
+                                         meta.getValueValidator(name.hasRemaining()
+                                                                ? comparator.cellFromByteBuffer(name)
+                                                                : meta.comparator.rowMarker(Composites.EMPTY)));
                 }
             }
         }
@@ -219,10 +219,10 @@ public class SSTableImport
                 cfamily.addAtom(new RangeTombstone(start, end, col.timestamp, col.localExpirationTime));
                 continue;
             }
-            
+
             assert cfm.isCQL3Table() || col.getName().hasRemaining() : "Cell name should not be empty";
-            CellName cname = col.getName().hasRemaining() ? cfm.comparator.cellFromByteBuffer(col.getName()) 
-                    : cfm.comparator.rowMarker(Composites.EMPTY);
+            CellName cname = col.getName().hasRemaining() ? cfm.comparator.cellFromByteBuffer(col.getName())
+                                                          : cfm.comparator.rowMarker(Composites.EMPTY);
 
             if (col.isExpiring())
             {
@@ -345,13 +345,13 @@ public class SSTableImport
                 break;
         }
 
-        writer.closeAndOpenReader();
+        writer.finish(true);
 
         return importedKeys;
     }
 
     private int importSorted(String jsonFile, ColumnFamily columnFamily, String ssTablePath,
-            IPartitioner partitioner) throws IOException
+                             IPartitioner partitioner) throws IOException
     {
         int importedKeys = 0; // already imported keys count
         long start = System.nanoTime();
@@ -377,55 +377,56 @@ public class SSTableImport
         System.out.printf("Importing %s keys...%n", keyCountToImport);
 
         parser = getParser(jsonFile); // renewing parser
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);
+        try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(ssTablePath), keyCountToImport, ActiveRepairService.UNREPAIRED_SSTABLE);)
+        {
+            int lineNumber = 1;
+            DecoratedKey prevStoredKey = null;
 
-        int lineNumber = 1;
-        DecoratedKey prevStoredKey = null;
+            parser.nextToken(); // START_ARRAY
+            while (parser.nextToken() != null)
+            {
+                String key = parser.getCurrentName();
+                Map<?, ?> row = parser.readValueAs(new TypeReference<Map<?, ?>>(){});
+                DecoratedKey currentKey = partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) row.get("key")));
 
-        parser.nextToken(); // START_ARRAY
-        while (parser.nextToken() != null)
-        {
-            String key = parser.getCurrentName();
-            Map<?, ?> row = parser.readValueAs(new TypeReference<Map<?, ?>>(){});
-            DecoratedKey currentKey = partitioner.decorateKey(getKeyValidator(columnFamily).fromString((String) row.get("key")));
+                if (row.containsKey("metadata"))
+                    parseMeta((Map<?, ?>) row.get("metadata"), columnFamily, null);
 
-            if (row.containsKey("metadata"))
-                parseMeta((Map<?, ?>) row.get("metadata"), columnFamily, null);
+                addColumnsToCF((List<?>) row.get("cells"), columnFamily);
 
-            addColumnsToCF((List<?>) row.get("cells"), columnFamily);
+                if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
+                {
+                    System.err
+                    .printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n",
+                            lineNumber, key);
+                    return -1;
+                }
 
-            if (prevStoredKey != null && prevStoredKey.compareTo(currentKey) != -1)
-            {
-                System.err
-                        .printf("Line %d: Key %s is greater than previous, collection is not sorted properly. Aborting import. You might need to delete SSTables manually.%n",
-                                lineNumber, key);
-                return -1;
-            }
+                // saving decorated key
+                writer.append(currentKey, columnFamily);
+                columnFamily.clear();
 
-            // saving decorated key
-            writer.append(currentKey, columnFamily);
-            columnFamily.clear();
+                prevStoredKey = currentKey;
+                importedKeys++;
+                lineNumber++;
 
-            prevStoredKey = currentKey;
-            importedKeys++;
-            lineNumber++;
+                long current = System.nanoTime();
 
-            long current = System.nanoTime();
+                if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
+                {
+                    System.out.printf("Currently imported %d keys.%n", importedKeys);
+                    start = current;
+                }
+
+                if (keyCountToImport == importedKeys)
+                    break;
 
-            if (TimeUnit.NANOSECONDS.toSeconds(current - start) >= 5) // 5 secs.
-            {
-                System.out.printf("Currently imported %d keys.%n", importedKeys);
-                start = current;
             }
 
-            if (keyCountToImport == importedKeys)
-                break;
+            writer.finish(true);
 
+            return importedKeys;
         }
-
-        writer.closeAndOpenReader();
-
-        return importedKeys;
     }
 
     /**
@@ -511,7 +512,7 @@ public class SSTableImport
 
         try
         {
-           new SSTableImport(keyCountToImport, isSorted).importJson(json, keyspace, cfamily, ssTable);
+            new SSTableImport(keyCountToImport, isSorted).importJson(json, keyspace, cfamily, ssTable);
         }
         catch (Exception e)
         {
@@ -527,7 +528,7 @@ public class SSTableImport
     private static void printProgramUsage()
     {
         System.out.printf("Usage: %s -s -K <keyspace> -c <column_family> -n <num_keys> <json> <sstable>%n%n",
-                            SSTableImport.class.getName());
+                          SSTableImport.class.getName());
 
         System.out.println("Options:");
         for (Object o :  options.getOptions())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
index a7f6fce..44d8f24 100644
--- a/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
+++ b/src/java/org/apache/cassandra/utils/AlwaysPresentFilter.java
@@ -35,6 +35,11 @@ public class AlwaysPresentFilter implements IFilter
         return this;
     }
 
+    public Throwable close(Throwable accumulate)
+    {
+        return accumulate;
+    }
+
     public long serializedSize() { return 0; }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/Throwables.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/Throwables.java b/src/java/org/apache/cassandra/utils/Throwables.java
index 552ca87..0a2bd28 100644
--- a/src/java/org/apache/cassandra/utils/Throwables.java
+++ b/src/java/org/apache/cassandra/utils/Throwables.java
@@ -29,4 +29,9 @@ public class Throwables
         return existingFail;
     }
 
+    public static void maybeFail(Throwable fail)
+    {
+        if (fail != null)
+            com.google.common.base.Throwables.propagate(fail);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Ref.java b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
index 25f8510..ebabd79 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Ref.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Ref.java
@@ -11,12 +11,14 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import com.google.common.base.Throwables;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
 /**
  * An object that needs ref counting does the two following:
  *   - defines a Tidy object that will cleanup once it's gone,
@@ -77,14 +79,19 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
         state.release(false);
     }
 
+    public Throwable ensureReleased(Throwable accumulate)
+    {
+        return state.ensureReleased(accumulate);
+    }
+
     public void ensureReleased()
     {
-        state.ensureReleased();
+        maybeFail(state.ensureReleased(null));
     }
 
     public void close()
     {
-        state.ensureReleased();
+        ensureReleased();
     }
 
     public T get()
@@ -150,14 +157,15 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
             assert released == 0;
         }
 
-        void ensureReleased()
+        Throwable ensureReleased(Throwable accumulate)
         {
             if (releasedUpdater.getAndSet(this, 1) == 0)
             {
-                globalState.release(this);
+                accumulate = globalState.release(this, accumulate);
                 if (DEBUG_ENABLED)
                     debug.deallocate();
             }
+            return accumulate;
         }
 
         void release(boolean leak)
@@ -174,7 +182,7 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
                 }
                 return;
             }
-            globalState.release(this);
+            Throwable fail = globalState.release(this, null);
             if (leak)
             {
                 String id = this.toString();
@@ -186,6 +194,8 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
             {
                 debug.deallocate();
             }
+            if (fail != null)
+                logger.error("Error when closing {}", globalState, fail);
         }
     }
 
@@ -264,7 +274,7 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
         }
 
         // release a single reference, and cleanup if no more are extant
-        void release(Ref.State ref)
+        Throwable release(Ref.State ref, Throwable accumulate)
         {
             locallyExtant.remove(ref);
             if (-1 == counts.decrementAndGet())
@@ -276,10 +286,10 @@ public final class Ref<T> implements RefCounted<T>, AutoCloseable
                 }
                 catch (Throwable t)
                 {
-                    logger.error("Error when closing {}", this, t);
-                    Throwables.propagate(t);
+                    accumulate = merge(accumulate, t);
                 }
             }
+            return accumulate;
         }
 
         int count()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Refs.java b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
index 1c6486e..dd65971 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/Refs.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/Refs.java
@@ -9,6 +9,7 @@ import com.google.common.base.Throwables;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 
+import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
 /**
@@ -204,7 +205,10 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
 
     public static void release(Iterable<? extends Ref<?>> refs)
     {
-        Throwable fail = null;
+        maybeFail(release(refs, null));
+    }
+    public static Throwable release(Iterable<? extends Ref<?>> refs, Throwable accumulate)
+    {
         for (Ref ref : refs)
         {
             try
@@ -213,11 +217,10 @@ public final class Refs<T extends RefCounted<T>> extends AbstractCollection<T> i
             }
             catch (Throwable t)
             {
-                fail = merge(fail, t);
+                accumulate = merge(accumulate, t);
             }
         }
-        if (fail != null)
-            throw Throwables.propagate(fail);
+        return accumulate;
     }
 
     public static <T extends SelfRefCounted<T>> Iterable<Ref<T>> selfRefs(Iterable<T> refs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
index 1e5a026..a3a1863 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseable.java
@@ -31,5 +31,6 @@ public interface SharedCloseable extends AutoCloseable
      * Throws an exception if the shared resource has already been closed.
      */
     public SharedCloseable sharedCopy();
+    public Throwable close(Throwable accumulate);
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
index 0d3a843..d85fd54 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SharedCloseableImpl.java
@@ -44,4 +44,9 @@ public abstract class SharedCloseableImpl implements SharedCloseable
     {
         ref.ensureReleased();
     }
+
+    public Throwable close(Throwable accumulate)
+    {
+        return ref.ensureReleased(accumulate);
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/concurrent/Transactional.java b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
new file mode 100644
index 0000000..bcf5095
--- /dev/null
+++ b/src/java/org/apache/cassandra/utils/concurrent/Transactional.java
@@ -0,0 +1,198 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils.concurrent;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+import static org.apache.cassandra.utils.Throwables.merge;
+
+/**
+ * An abstraction for Transactional behaviour. An object implementing this interface has a lifetime
+ * of the following pattern:
+ *
+ * Throwable failure = null;
+ * try (Transactional t1, t2 = ...)
+ * {
+ *     // do work with t1 and t2
+ *     t1.prepareToCommit();
+ *     t2.prepareToCommit();
+ *     failure = t1.commit(failure);
+ *     failure = t2.commit(failure);
+ * }
+ * logger.error(failure);
+ *
+ * If something goes wrong before commit() is called on any transaction, then on exiting the try block
+ * the auto close method should invoke cleanup() and then abort() to reset any state.
+ * If everything completes normally, then on exiting the try block the auto close method will invoke cleanup
+ * to release any temporary state/resources
+ *
+ * No exceptions should be thrown during commit; if they are, it is not at all clear what the correct behaviour
+ * of the system should be, and so simply logging the exception is likely best (since it may have been an issue
+ * during cleanup, say), and rollback cannot now occur. As such all exceptions and assertions that may be thrown
+ * should be checked and ruled out during commit preparation.
+ */
+public interface Transactional extends AutoCloseable
+{
+
+    /**
+     * A simple abstract implementation of Transactional behaviour.
+     * In general this should be used as the base class for any transactional implementations.
+     *
+     * If the implementation wraps any internal Transactional objects, it must proxy every
+     * commit() and abort() call onto each internal object to ensure correct behaviour
+     */
+    public static abstract class AbstractTransactional implements Transactional
+    {
+        public static enum State
+        {
+            IN_PROGRESS,
+            READY_TO_COMMIT,
+            COMMITTED,
+            ABORTED;
+        }
+
+        private State state = State.IN_PROGRESS;
+
+        // the methods for actually performing the necessary behaviours, that are themselves protected against
+        // improper use by the external implementations provided by this class. empty default implementations
+        // could be provided, but we consider it safer to force implementers to consider explicitly their presence
+
+        protected abstract Throwable doCommit(Throwable accumulate);
+        protected abstract Throwable doAbort(Throwable accumulate);
+
+        // this only needs to perform cleanup of state unique to this instance; any internal
+        // Transactional objects will perform cleanup in the commit() or abort() calls
+        protected abstract Throwable doCleanup(Throwable accumulate);
+
+        /**
+         * Do any preparatory work prior to commit. This method should throw any exceptions that can be encountered
+         * during the finalization of the behaviour.
+         */
+        protected abstract void doPrepare();
+
+        /**
+         * commit any effects of this transaction object graph, then cleanup; delegates first to doCommit, then to doCleanup
+         */
+        public final Throwable commit(Throwable accumulate)
+        {
+            if (state != State.READY_TO_COMMIT)
+                throw new IllegalStateException("Commit attempted before prepared to commit");
+            accumulate = doCommit(accumulate);
+            accumulate = doCleanup(accumulate);
+            state = State.COMMITTED;
+            return accumulate;
+        }
+
+        /**
+         * rollback any effects of this transaction object graph; delegates first to doCleanup, then to doAbort
+         */
+        public final Throwable abort(Throwable accumulate)
+        {
+            if (state == State.ABORTED)
+                return accumulate;
+            if (state == State.COMMITTED)
+            {
+                try
+                {
+                    throw new IllegalStateException("Attempted to abort a committed operation");
+                }
+                catch (Throwable t)
+                {
+                    accumulate = merge(accumulate, t);
+                }
+                return accumulate;
+            }
+            state = State.ABORTED;
+            // we cleanup first so that, e.g., file handles can be released prior to deletion
+            accumulate = doCleanup(accumulate);
+            accumulate = doAbort(accumulate);
+            return accumulate;
+        }
+
+        // if we are committed or aborted, then we are done; otherwise abort
+        public final void close()
+        {
+            switch (state)
+            {
+                case COMMITTED:
+                case ABORTED:
+                    break;
+                default:
+                    abort();
+            }
+        }
+
+        /**
+         * The first phase of commit: delegates to doPrepare(), with valid state transition enforcement.
+         * This call should be propagated onto any child objects participating in the transaction
+         */
+        public final void prepareToCommit()
+        {
+            if (state != State.IN_PROGRESS)
+                throw new IllegalStateException("Cannot prepare to commit unless IN_PROGRESS; state is " + state);
+
+            doPrepare();
+            state = State.READY_TO_COMMIT;
+        }
+
+        /**
+         * convenience method to both prepareToCommit() and commit() in one operation;
+         * only of use to outer-most transactional object of an object graph
+         */
+        public Object finish()
+        {
+            prepareToCommit();
+            commit();
+            return this;
+        }
+
+        // convenience method wrapping abort, and throwing any exception encountered
+        // only of use to (and to be used by) outer-most object in a transactional graph
+        public final void abort()
+        {
+            maybeFail(abort(null));
+        }
+
+        // convenience method wrapping commit, and throwing any exception encountered
+        // only of use to (and to be used by) outer-most object in a transactional graph
+        public final void commit()
+        {
+            maybeFail(commit(null));
+        }
+
+        public final State state()
+        {
+            return state;
+        }
+    }
+
+    // commit should generally never throw an exception, and preferably never generate one,
+    // but if it does generate one it should accumulate it in the parameter and return the result
+    // IF a commit implementation has a real correctness affecting exception that cannot be moved to
+    // prepareToCommit, it MUST be executed before any other commit methods in the object graph
+    public Throwable commit(Throwable accumulate);
+
+    // release any resources, then rollback all state changes (unless commit() has already been invoked)
+    public Throwable abort(Throwable accumulate);
+
+    public void prepareToCommit();
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 128d1b0..09121f4 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -336,7 +336,7 @@ public class ScrubTest
         writer.append(Util.dk("c"), cf);
         writer.append(Util.dk("y"), cf);
         writer.append(Util.dk("d"), cf);
-        writer.closeAndOpenReader();
+        writer.finish();
         */
 
         String root = System.getProperty("corrupt-sstable-root");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index 231b3f3..1dc72ae 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -156,11 +156,12 @@ public class AntiCompactionTest
         File dir = cfs.directories.getDirectoryForNewSSTables();
         String filename = cfs.getTempSSTablePath(dir);
 
-        SSTableWriter writer = SSTableWriter.create(filename, 0, 0);
-
-        for (int i = 0; i < count * 5; i++)
-            writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
-        return writer.closeAndOpenReader();
+        try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0);)
+        {
+            for (int i = 0; i < count * 5; i++)
+                writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+            return writer.finish(true);
+        }
     }
 
     public void generateSStable(ColumnFamilyStore store, String Suffix)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 42ea0c7..18418e8 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -240,9 +240,9 @@ public class CompactionsTest
         long newSize1 = it.next().uncompressedLength();
         long newSize2 = it.next().uncompressedLength();
         assertEquals("candidate sstable should not be tombstone-compacted because its key range overlap with other sstable",
-                      originalSize1, newSize1);
+                     originalSize1, newSize1);
         assertEquals("candidate sstable should not be tombstone-compacted because its key range overlap with other sstable",
-                      originalSize2, newSize2);
+                     originalSize2, newSize2);
 
         // now let's enable the magic property
         store.metadata.compactionStrategyOptions.put("unchecked_tombstone_compaction", "true");
@@ -401,21 +401,24 @@ public class CompactionsTest
         cf.addColumn(Util.column("a", "a", 3));
         cf.deletionInfo().add(new RangeTombstone(Util.cellname("0"), Util.cellname("b"), 2, (int) (System.currentTimeMillis()/1000)),cfmeta.comparator);
 
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
-
+        try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);)
+        {
+            writer.append(Util.dk("0"), cf);
+            writer.append(Util.dk("1"), cf);
+            writer.append(Util.dk("3"), cf);
 
-        writer.append(Util.dk("0"), cf);
-        writer.append(Util.dk("1"), cf);
-        writer.append(Util.dk("3"), cf);
+            cfs.addSSTable(writer.finish(true));
+        }
 
-        cfs.addSSTable(writer.closeAndOpenReader());
-        writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
+        try (SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);)
+        {
+            writer.append(Util.dk("0"), cf);
+            writer.append(Util.dk("1"), cf);
+            writer.append(Util.dk("2"), cf);
+            writer.append(Util.dk("3"), cf);
+            cfs.addSSTable(writer.finish(true));
+        }
 
-        writer.append(Util.dk("0"), cf);
-        writer.append(Util.dk("1"), cf);
-        writer.append(Util.dk("2"), cf);
-        writer.append(Util.dk("3"), cf);
-        cfs.addSSTable(writer.closeAndOpenReader());
 
         Collection<SSTableReader> toCompact = cfs.getSSTables();
         assert toCompact.size() == 2;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
index 678b926..fe04096 100644
--- a/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/RandomAccessReaderTest.java
@@ -30,7 +30,7 @@ public class RandomAccessReaderTest
 
         SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
         writer.write(expected.getBytes());
-        writer.close();
+        writer.finish();
 
         assert f.exists();
 
@@ -58,7 +58,7 @@ public class RandomAccessReaderTest
 
         SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
         writer.write(expected.getBytes());
-        writer.close();
+        writer.finish();
 
         assert f.exists();
 
@@ -87,7 +87,7 @@ public class RandomAccessReaderTest
         SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
         for (int i = 0; i < numIterations; i++)
             writer.write(expected.getBytes());
-        writer.close();
+        writer.finish();
 
         assert f.exists();
 
@@ -166,7 +166,7 @@ public class RandomAccessReaderTest
         SequentialWriter writer = new SequentialWriter(f, CompressionParameters.DEFAULT_CHUNK_LENGTH, false);
         for (int i = 0; i < expected.length; i++)
             writer.write(expected[i].getBytes());
-        writer.close();
+        writer.finish();
 
         assert f.exists();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
index f4d3e87..cfc4bb8 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
@@ -81,7 +81,7 @@ public class CompressedRandomAccessReaderTest
 
             for (int i = 0; i < 20; i++)
                 writer.write("x".getBytes());
-            writer.close();
+            writer.finish();
 
             CompressedRandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length()));
             String res = reader.readLine();
@@ -124,7 +124,7 @@ public class CompressedRandomAccessReaderTest
 
             writer.resetAndTruncate(mark);
             writer.write("brown fox jumps over the lazy dog".getBytes());
-            writer.close();
+            writer.finish();
 
             assert f.exists();
             RandomAccessReader reader = compressed
@@ -161,10 +161,11 @@ public class CompressedRandomAccessReaderTest
         metadata.deleteOnExit();
 
         MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
-        SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector);
-
-        writer.write(CONTENT.getBytes());
-        writer.close();
+        try (SequentialWriter writer = new CompressedSequentialWriter(file, metadata.getPath(), new CompressionParameters(SnappyCompressor.instance), sstableMetadataCollector))
+        {
+            writer.write(CONTENT.getBytes());
+            writer.finish();
+        }
 
         ChannelProxy channel = new ChannelProxy(file);
 
@@ -175,8 +176,6 @@ public class CompressedRandomAccessReaderTest
         RandomAccessReader reader = CompressedRandomAccessReader.open(channel, meta);
         // read and verify compressed data
         assertEquals(CONTENT, reader.readLine());
-        // close reader
-        reader.close();
 
         Random random = new Random();
         RandomAccessFile checksumModifier = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
index 46da343..184319f 100644
--- a/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/compress/CompressedSequentialWriterTest.java
@@ -17,23 +17,31 @@
  */
 package org.apache.cassandra.io.compress;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Random;
+import java.util.*;
 
+import static org.apache.commons.io.FileUtils.readFileToByteArray;
 import static org.junit.Assert.assertEquals;
+
+import org.junit.After;
 import org.junit.Test;
 
+import junit.framework.Assert;
+import org.apache.cassandra.db.composites.CellNames;
 import org.apache.cassandra.db.composites.SimpleDenseCellNameType;
 import org.apache.cassandra.db.marshal.BytesType;
+import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.ChannelProxy;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.RandomAccessReader;
+import org.apache.cassandra.io.util.SequentialWriterTest;
 
-public class CompressedSequentialWriterTest
+public class CompressedSequentialWriterTest extends SequentialWriterTest
 {
     private ICompressor compressor;
 
@@ -78,30 +86,31 @@ public class CompressedSequentialWriterTest
         try
         {
             MetadataCollector sstableMetadataCollector = new MetadataCollector(new SimpleDenseCellNameType(BytesType.instance)).replayPosition(null);
-            CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);
-
             byte[] dataPre = new byte[bytesToTest];
             byte[] rawPost = new byte[bytesToTest];
-            Random r = new Random();
-
-            // Test both write with byte[] and ByteBuffer
-            r.nextBytes(dataPre);
-            r.nextBytes(rawPost);
-            ByteBuffer dataPost = makeBB(bytesToTest);
-            dataPost.put(rawPost);
-            dataPost.flip();
-
-            writer.write(dataPre);
-            FileMark mark = writer.mark();
-
-            // Write enough garbage to transition chunk
-            for (int i = 0; i < CompressionParameters.DEFAULT_CHUNK_LENGTH; i++)
+            try (CompressedSequentialWriter writer = new CompressedSequentialWriter(f, filename + ".metadata", new CompressionParameters(compressor), sstableMetadataCollector);)
             {
-                writer.write((byte)i);
+                Random r = new Random();
+
+                // Test both write with byte[] and ByteBuffer
+                r.nextBytes(dataPre);
+                r.nextBytes(rawPost);
+                ByteBuffer dataPost = makeBB(bytesToTest);
+                dataPost.put(rawPost);
+                dataPost.flip();
+
+                writer.write(dataPre);
+                FileMark mark = writer.mark();
+
+                // Write enough garbage to transition chunk
+                for (int i = 0; i < CompressionParameters.DEFAULT_CHUNK_LENGTH; i++)
+                {
+                    writer.write((byte)i);
+                }
+                writer.resetAndTruncate(mark);
+                writer.write(dataPost);
+                writer.finish();
             }
-            writer.resetAndTruncate(mark);
-            writer.write(dataPost);
-            writer.close();
 
             assert f.exists();
             RandomAccessReader reader = CompressedRandomAccessReader.open(channel, new CompressionMetadata(filename + ".metadata", f.length()));
@@ -137,4 +146,85 @@ public class CompressedSequentialWriterTest
                 ? ByteBuffer.allocateDirect(size)
                 : ByteBuffer.allocate(size);
     }
+
+    private final List<TestableCSW> writers = new ArrayList<>();
+
+    @After
+    public void cleanup()
+    {
+        for (TestableCSW sw : writers)
+            sw.cleanup();
+        writers.clear();
+    }
+
+    protected TestableTransaction newTest() throws IOException
+    {
+        TestableCSW sw = new TestableCSW();
+        writers.add(sw);
+        return sw;
+    }
+
+    private static class TestableCSW extends TestableSW
+    {
+        final File offsetsFile;
+
+        private TestableCSW() throws IOException
+        {
+            this(tempFile("compressedsequentialwriter"),
+                 tempFile("compressedsequentialwriter.offsets"));
+        }
+
+        private TestableCSW(File file, File offsetsFile) throws IOException
+        {
+            this(file, offsetsFile, new CompressedSequentialWriter(file, offsetsFile.getPath(), new CompressionParameters(LZ4Compressor.instance, BUFFER_SIZE, new HashMap<String, String>()), new MetadataCollector(CellNames.fromAbstractType(UTF8Type.instance, false))));
+        }
+
+        private TestableCSW(File file, File offsetsFile, CompressedSequentialWriter sw) throws IOException
+        {
+            super(file, sw);
+            this.offsetsFile = offsetsFile;
+        }
+
+        protected void assertInProgress() throws Exception
+        {
+            Assert.assertTrue(file.exists());
+            Assert.assertFalse(offsetsFile.exists());
+            byte[] compressed = readFileToByteArray(file);
+            byte[] uncompressed = new byte[partialContents.length];
+            LZ4Compressor.instance.uncompress(compressed, 0, compressed.length - 4, uncompressed, 0);
+            Assert.assertTrue(Arrays.equals(partialContents, uncompressed));
+        }
+
+        protected void assertPrepared() throws Exception
+        {
+            Assert.assertTrue(file.exists());
+            Assert.assertTrue(offsetsFile.exists());
+            DataInputStream offsets = new DataInputStream(new ByteArrayInputStream(readFileToByteArray(offsetsFile)));
+            Assert.assertTrue(offsets.readUTF().endsWith("LZ4Compressor"));
+            Assert.assertEquals(0, offsets.readInt());
+            Assert.assertEquals(BUFFER_SIZE, offsets.readInt());
+            Assert.assertEquals(fullContents.length, offsets.readLong());
+            Assert.assertEquals(2, offsets.readInt());
+            Assert.assertEquals(0, offsets.readLong());
+            int offset = (int) offsets.readLong();
+            byte[] compressed = readFileToByteArray(file);
+            byte[] uncompressed = new byte[fullContents.length];
+            LZ4Compressor.instance.uncompress(compressed, 0, offset - 4, uncompressed, 0);
+            LZ4Compressor.instance.uncompress(compressed, offset, compressed.length - (4 + offset), uncompressed, partialContents.length);
+            Assert.assertTrue(Arrays.equals(fullContents, uncompressed));
+        }
+
+        protected void assertAborted() throws Exception
+        {
+            super.assertAborted();
+            Assert.assertFalse(offsetsFile.exists());
+        }
+
+        void cleanup()
+        {
+            file.delete();
+            offsetsFile.delete();
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
new file mode 100644
index 0000000..dfb55a1
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -0,0 +1,130 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.junit.After;
+import org.junit.BeforeClass;
+
+import junit.framework.Assert;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.config.KSMetaData;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.locator.SimpleStrategy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+
+public class BigTableWriterTest extends AbstractTransactionalTest
+{
+    public static final String KEYSPACE1 = "BigTableWriterTest";
+    public static final String CF_STANDARD = "Standard1";
+
+    private static ColumnFamilyStore cfs;
+
+    @BeforeClass
+    public static void defineSchema() throws Exception
+    {
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE1,
+                                    SimpleStrategy.class,
+                                    KSMetaData.optsWithRF(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD));
+        cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD);
+    }
+
+    protected TestableTransaction newTest() throws IOException
+    {
+        return new TestableBTW();
+    }
+
+    private static class TestableBTW extends TestableTransaction
+    {
+        final File file;
+        final Descriptor descriptor;
+        final SSTableWriter writer;
+
+        private TestableBTW() throws IOException
+        {
+            this(cfs.getTempSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
+        }
+
+        private TestableBTW(String file) throws IOException
+        {
+            this(file, SSTableWriter.create(file, 0, 0));
+        }
+
+        private TestableBTW(String file, SSTableWriter sw) throws IOException
+        {
+            super(sw);
+            this.file = new File(file);
+            this.descriptor = Descriptor.fromFilename(file);
+            this.writer = sw;
+            ArrayBackedSortedColumns cf = ArrayBackedSortedColumns.factory.create(cfs.metadata);
+            for (int i = 0; i < 10; i++)
+                cf.addColumn(Util.cellname(i), SSTableRewriterTest.random(0, 1000), 1);
+            for (int i = 0; i < 100; i++)
+                writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)), cf);
+        }
+
+        protected void assertInProgress() throws Exception
+        {
+            assertExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX);
+            assertNotExists(Descriptor.Type.TEMP, Component.FILTER, Component.SUMMARY);
+            assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+            Assert.assertTrue(file.length() > 0);
+        }
+
+        protected void assertPrepared() throws Exception
+        {
+            assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+            assertExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+        }
+
+        protected void assertAborted() throws Exception
+        {
+            assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+            assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+            Assert.assertFalse(file.exists());
+        }
+
+        protected void assertCommitted() throws Exception
+        {
+            assertPrepared();
+        }
+
+        private void assertExists(Descriptor.Type type, Component ... components)
+        {
+            for (Component component : components)
+                Assert.assertTrue(new File(descriptor.asType(type).filenameFor(component)).exists());
+        }
+        private void assertNotExists(Descriptor.Type type, Component ... components)
+        {
+            for (Component component : components)
+                Assert.assertFalse(type.toString() + " " + component.toString(), new File(descriptor.asType(type).filenameFor(component)).exists());
+        }
+    }
+
+}