You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/05/17 14:51:08 UTC

[6/7] cassandra git commit: Introduce Transactional API for internal state changes

Introduce Transactional API for internal state changes

patch by benedict; reviewed by josh for CASSANDRA-8984


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8704006b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8704006b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8704006b

Branch: refs/heads/trunk
Commit: 8704006bfa75a78cb904e35662e4c8bafc1f2330
Parents: ff10d63
Author: Benedict Elliott Smith <be...@apache.org>
Authored: Sun May 17 13:50:03 2015 +0100
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Sun May 17 13:50:03 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 src/java/org/apache/cassandra/db/Memtable.java  |  14 +-
 .../db/compaction/CompactionManager.java        |  24 +-
 .../cassandra/db/compaction/CompactionTask.java |  31 +--
 .../cassandra/db/compaction/Scrubber.java       |  21 +-
 .../cassandra/db/compaction/Upgrader.java       |  11 +-
 .../writers/CompactionAwareWriter.java          |  42 +++-
 .../writers/DefaultCompactionWriter.java        |  18 +-
 .../writers/MajorLeveledCompactionWriter.java   |  28 +--
 .../writers/MaxSSTableSizeWriter.java           |  16 +-
 .../SplittingSizeTieredCompactionWriter.java    |  17 +-
 .../io/compress/CompressedSequentialWriter.java |  59 ++---
 .../io/compress/CompressionMetadata.java        | 131 +++++-----
 .../cassandra/io/sstable/IndexSummary.java      |  11 +-
 .../io/sstable/IndexSummaryBuilder.java         |  13 +-
 .../apache/cassandra/io/sstable/SSTable.java    |   2 +-
 .../cassandra/io/sstable/SSTableRewriter.java   | 250 +++++++++----------
 .../io/sstable/SSTableSimpleUnsortedWriter.java |  56 +++--
 .../io/sstable/SSTableSimpleWriter.java         |   9 +-
 .../io/sstable/format/SSTableReader.java        |  13 +-
 .../io/sstable/format/SSTableWriter.java        | 137 +++++++---
 .../io/sstable/format/big/BigTableWriter.java   | 248 +++++++++---------
 .../io/util/BufferedPoolingSegmentedFile.java   |   3 +-
 .../io/util/BufferedSegmentedFile.java          |   3 +-
 .../io/util/ChecksummedSequentialWriter.java    |  38 ++-
 .../io/util/CompressedPoolingSegmentedFile.java |   4 +-
 .../io/util/CompressedSegmentedFile.java        |   9 +-
 .../org/apache/cassandra/io/util/FileUtils.java |  39 ++-
 .../cassandra/io/util/MmappedSegmentedFile.java |   3 +-
 .../apache/cassandra/io/util/SafeMemory.java    |   5 +
 .../cassandra/io/util/SafeMemoryWriter.java     |   5 +
 .../apache/cassandra/io/util/SegmentedFile.java |  23 +-
 .../cassandra/io/util/SequentialWriter.java     | 116 ++++++---
 .../cassandra/streaming/StreamReceiveTask.java  |   2 +-
 .../apache/cassandra/tools/SSTableImport.java   |  97 +++----
 .../cassandra/utils/AlwaysPresentFilter.java    |   5 +
 .../org/apache/cassandra/utils/Throwables.java  |   5 +
 .../apache/cassandra/utils/concurrent/Ref.java  |  28 ++-
 .../apache/cassandra/utils/concurrent/Refs.java |  11 +-
 .../utils/concurrent/SharedCloseable.java       |   1 +
 .../utils/concurrent/SharedCloseableImpl.java   |   5 +
 .../utils/concurrent/Transactional.java         | 198 +++++++++++++++
 .../unit/org/apache/cassandra/db/ScrubTest.java |   2 +-
 .../db/compaction/AntiCompactionTest.java       |  11 +-
 .../db/compaction/CompactionsTest.java          |  31 +--
 .../cassandra/io/RandomAccessReaderTest.java    |   8 +-
 .../CompressedRandomAccessReaderTest.java       |  15 +-
 .../CompressedSequentialWriterTest.java         | 136 ++++++++--
 .../io/sstable/BigTableWriterTest.java          | 130 ++++++++++
 .../io/sstable/SSTableRewriterTest.java         | 154 +++++-------
 .../cassandra/io/sstable/SSTableUtils.java      |   2 +-
 .../io/util/BufferedRandomAccessFileTest.java   |  28 +--
 .../util/ChecksummedSequentialWriterTest.java   |  92 +++++++
 .../cassandra/io/util/DataOutputTest.java       |   1 +
 .../cassandra/io/util/SequentialWriterTest.java | 117 +++++++++
 .../compress/CompressedInputStreamTest.java     |   2 +-
 .../cassandra/tools/SSTableExportTest.java      |  16 +-
 .../concurrent/AbstractTransactionalTest.java   | 136 ++++++++++
 58 files changed, 1731 insertions(+), 902 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e3b25b5..9f14fba 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.0-beta1
+ * Introduce Transactional API for internal state changes (CASSANDRA-8984)
  * Add a flag in cassandra.yaml to enable UDFs (CASSANDRA-9404)
  * Better support of null for UDF (CASSANDRA-8374)
  * Use ecj instead of javassist for UDFs (CASSANDRA-8241)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index ef47aba..3509b27 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-import com.google.common.base.Throwables;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
@@ -341,8 +340,7 @@ public class Memtable
 
             SSTableReader ssTable;
             // errors when creating the writer that may leave empty temp files.
-            SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory));
-            try
+            try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory)))
             {
                 boolean trackContention = logger.isDebugEnabled();
                 int heavilyContendedRowCount = 0;
@@ -372,16 +370,13 @@ public class Memtable
 
                 if (writer.getFilePointer() > 0)
                 {
-                    writer.isolateReferences();
-
                     // temp sstables should contain non-repaired data.
-                    ssTable = writer.closeAndOpenReader();
+                    ssTable = writer.finish(true);
                     logger.info(String.format("Completed flushing %s (%d bytes) for commitlog position %s",
                                               ssTable.getFilename(), new File(ssTable.getFilename()).length(), context));
                 }
                 else
                 {
-                    writer.abort();
                     ssTable = null;
                     logger.info("Completed flushing; nothing needed to be retained.  Commitlog position was {}",
                                 context);
@@ -392,11 +387,6 @@ public class Memtable
 
                 return ssTable;
             }
-            catch (Throwable e)
-            {
-                writer.abort();
-                throw Throwables.propagate(e);
-            }
         }
 
         public SSTableWriter createFlushWriter(String filename)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 35e288d..fc83cc5 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -21,7 +21,6 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -42,7 +41,6 @@ import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
 import com.google.common.base.Predicate;
-import com.google.common.base.Throwables;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 import org.slf4j.Logger;
@@ -787,9 +785,9 @@ public class CompactionManager implements CompactionManagerMBean
 
         metrics.beginCompaction(ci);
         Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
-        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
         List<SSTableReader> finished;
-        try (CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, false);
+             CompactionController controller = new CompactionController(cfs, sstableSet, getDefaultGcBefore(cfs)))
         {
             writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
 
@@ -813,11 +811,6 @@ public class CompactionManager implements CompactionManagerMBean
             finished = writer.finish();
             cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.CLEANUP);
         }
-        catch (Throwable e)
-        {
-            writer.abort();
-            throw Throwables.propagate(e);
-        }
         finally
         {
             scanner.close();
@@ -1178,13 +1171,12 @@ public class CompactionManager implements CompactionManagerMBean
         Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
 
         File destination = cfs.directories.getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
-        SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
-        SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
-
         long repairedKeyCount = 0;
         long unrepairedKeyCount = 0;
         AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
+        try (SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
+             SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, sstableAsSet, groupMaxDataAge, false);
+             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(anticompactionGroup);
              CompactionController controller = new CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
         {
             int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
@@ -1221,8 +1213,8 @@ public class CompactionManager implements CompactionManagerMBean
             // we have the same readers being rewritten by both writers, so we ask the first one NOT to close them
             // so that the second one can do so safely, without leaving us with references < 0 or any other ugliness
             List<SSTableReader> anticompactedSSTables = new ArrayList<>();
-            anticompactedSSTables.addAll(repairedSSTableWriter.finish(repairedAt));
-            anticompactedSSTables.addAll(unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE));
+            anticompactedSSTables.addAll(repairedSSTableWriter.setRepairedAt(repairedAt).finish());
+            anticompactedSSTables.addAll(unRepairedSSTableWriter.setRepairedAt(ActiveRepairService.UNREPAIRED_SSTABLE).finish());
             cfs.getDataTracker().markCompactedSSTablesReplaced(sstableAsSet, anticompactedSSTables, OperationType.ANTICOMPACTION);
 
             logger.debug("Repaired {} keys out of {} for {}/{} in {}", repairedKeyCount,
@@ -1236,8 +1228,6 @@ public class CompactionManager implements CompactionManagerMBean
         {
             JVMStabilityInspector.inspectThrowable(e);
             logger.error("Error anticompacting " + anticompactionGroup, e);
-            repairedSSTableWriter.abort();
-            unRepairedSSTableWriter.abort();
         }
         return 0;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index f472711..c397d9a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -167,20 +167,12 @@ public class CompactionTask extends AbstractCompactionTask
                 if (collector != null)
                     collector.beginCompaction(ci);
                 long lastCheckObsoletion = start;
-                CompactionAwareWriter writer = null;
-                try
+
+                if (!controller.cfs.getCompactionStrategy().isActive)
+                    throw new CompactionInterruptedException(ci.getCompactionInfo());
+
+                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, sstables, actuallyCompact))
                 {
-                    if (!controller.cfs.getCompactionStrategy().isActive)
-                       throw new CompactionInterruptedException(ci.getCompactionInfo());
-                    if (!iter.hasNext())
-                    {
-                        // don't mark compacted in the finally block, since if there _is_ nondeleted data,
-                        // we need to sync it (via closeAndOpen) first, so there is no period during which
-                        // a crash could cause data loss.
-                        cfs.markObsolete(sstables, compactionType);
-                        return;
-                    }
-                    writer = getCompactionAwareWriter(cfs, sstables, actuallyCompact);
                     estimatedKeys = writer.estimatedKeys();
                     while (iter.hasNext())
                     {
@@ -201,19 +193,6 @@ public class CompactionTask extends AbstractCompactionTask
                     // don't replace old sstables yet, as we need to mark the compaction finished in the system table
                     newSStables = writer.finish();
                 }
-                catch (Throwable t)
-                {
-                    try
-                    {
-                        if (writer != null)
-                            writer.abort();
-                    }
-                    catch (Throwable t2)
-                    {
-                        t.addSuppressed(t2);
-                    }
-                    throw t;
-                }
                 finally
                 {
                     // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 691566e..29472b3 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -127,8 +127,8 @@ public class Scrubber implements Closeable
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         Set<SSTableReader> oldSSTable = Sets.newHashSet(sstable);
-        SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);
-        try
+
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, oldSSTable, sstable.maxDataAge, isOffline);)
         {
             nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile);
             {
@@ -271,26 +271,27 @@ public class Scrubber implements Closeable
             {
                 // out of order rows, but no bad rows found - we can keep our repairedAt time
                 long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
-                SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);
-                for (Row row : outOfOrderRows)
-                    inOrderWriter.append(row.key, row.cf);
-                newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+                try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);)
+                {
+                    for (Row row : outOfOrderRows)
+                        inOrderWriter.append(row.key, row.cf);
+                    newInOrderSstable = inOrderWriter.finish(-1, sstable.maxDataAge, true);
+                }
                 if (!isOffline)
                     cfs.getDataTracker().addSSTables(Collections.singleton(newInOrderSstable));
                 outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable));
             }
 
             // finish obsoletes the old sstable
-            List<SSTableReader> finished = writer.finish(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt);
+            List<SSTableReader> finished = writer.setRepairedAt(badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt).finish();
             if (!finished.isEmpty())
                 newSstable = finished.get(0);
             if (!isOffline)
                 cfs.getDataTracker().markCompactedSSTablesReplaced(oldSSTable, finished, OperationType.SCRUB);
         }
-        catch (Throwable t)
+        catch (IOException e)
         {
-            writer.abort();
-            throw Throwables.propagate(t);
+            throw Throwables.propagate(e);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index 75964e1..30584fd 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -81,8 +81,9 @@ public class Upgrader
     {
         outputHandler.output("Upgrading " + sstable);
         Set<SSTableReader> toUpgrade = Sets.newHashSet(sstable);
-        SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
-        try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
+
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, toUpgrade, CompactionTask.getMaxDataAge(toUpgrade), true);
+             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(toUpgrade))
         {
             Iterator<AbstractCompactedRow> iter = new CompactionIterable(compactionType, scanners.scanners, controller, DatabaseDescriptor.getSSTableFormat()).iterator();
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
@@ -94,12 +95,6 @@ public class Upgrader
 
             writer.finish();
             outputHandler.output("Upgrade of " + sstable + " complete.");
-
-        }
-        catch (Throwable t)
-        {
-            writer.abort();
-            throw Throwables.propagate(t);
         }
         finally
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 2903ced..fe43186 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -25,28 +25,32 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.CompactionTask;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.utils.concurrent.Transactional;
 
 
 /**
  * Class that abstracts away the actual writing of files to make it possible to use CompactionTask for more
  * use cases.
  */
-public abstract class CompactionAwareWriter
+public abstract class CompactionAwareWriter extends Transactional.AbstractTransactional implements Transactional
 {
     protected final ColumnFamilyStore cfs;
     protected final Set<SSTableReader> nonExpiredSSTables;
     protected final long estimatedTotalKeys;
     protected final long maxAge;
     protected final long minRepairedAt;
+    protected final SSTableRewriter sstableWriter;
 
-    public CompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> nonExpiredSSTables)
+    public CompactionAwareWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline)
     {
         this.cfs = cfs;
         this.nonExpiredSSTables = nonExpiredSSTables;
         this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
+        this.sstableWriter = new SSTableRewriter(cfs, allSSTables, maxAge, offline);
     }
 
     /**
@@ -56,16 +60,40 @@ public abstract class CompactionAwareWriter
      */
     public abstract boolean append(AbstractCompactedRow row);
 
-    /**
-     * abort the compaction writer - make sure that all files created are removed etc
-     */
-    public abstract void abort();
+    @Override
+    protected Throwable doAbort(Throwable accumulate)
+    {
+        return sstableWriter.abort(accumulate);
+    }
+
+    @Override
+    protected Throwable doCleanup(Throwable accumulate)
+    {
+        return accumulate;
+    }
+
+    @Override
+    protected Throwable doCommit(Throwable accumulate)
+    {
+        return sstableWriter.commit(accumulate);
+    }
+
+    @Override
+    protected void doPrepare()
+    {
+        sstableWriter.prepareToCommit();
+    }
 
     /**
      * we are done, return the finished sstables so that the caller can mark the old ones as compacted
      * @return all the written sstables sstables
      */
-    public abstract List<SSTableReader> finish();
+    @Override
+    public List<SSTableReader> finish()
+    {
+        super.finish();
+        return sstableWriter.finished();
+    }
 
     /**
      * estimated number of keys we should write

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index d51c82d..3589b54 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -34,6 +34,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 
+import static org.apache.cassandra.utils.Throwables.maybeFail;
+
 
 /**
  * The default compaction writer - creates one output file in L0
@@ -41,13 +43,11 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class DefaultCompactionWriter extends CompactionAwareWriter
 {
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
-    private final SSTableRewriter sstableWriter;
 
     public DefaultCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
     {
-        super(cfs, nonExpiredSSTables);
+        super(cfs, allSSTables, nonExpiredSSTables, offline);
         logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
-        sstableWriter = new SSTableRewriter(cfs, allSSTables, maxAge, offline);
         long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
@@ -66,18 +66,6 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     }
 
     @Override
-    public void abort()
-    {
-        sstableWriter.abort();
-    }
-
-    @Override
-    public List<SSTableReader> finish()
-    {
-        return sstableWriter.finish();
-    }
-
-    @Override
     public long estimatedKeys()
     {
         return estimatedTotalKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index b2f8fe1..d48140e 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 public class MajorLeveledCompactionWriter extends CompactionAwareWriter
 {
     private static final Logger logger = LoggerFactory.getLogger(MajorLeveledCompactionWriter.class);
-    private final SSTableRewriter rewriter;
     private final long maxSSTableSize;
     private final long expectedWriteSize;
     private final Set<SSTableReader> allSSTables;
@@ -53,10 +52,9 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
 
     public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
     {
-        super(cfs, nonExpiredSSTables);
+        super(cfs, allSSTables, nonExpiredSSTables, offline);
         this.maxSSTableSize = maxSSTableSize;
         this.allSSTables = allSSTables;
-        rewriter = new SSTableRewriter(cfs, allSSTables, CompactionTask.getMaxDataAge(nonExpiredSSTables), offline);
         expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType));
         long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
         long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
@@ -72,17 +70,17 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                                     cfs.metadata,
                                                     cfs.partitioner,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors));
-        rewriter.switchWriter(writer);
+        sstableWriter.switchWriter(writer);
     }
 
     @Override
     public boolean append(AbstractCompactedRow row)
     {
-        long posBefore = rewriter.currentWriter().getOnDiskFilePointer();
-        RowIndexEntry rie = rewriter.append(row);
-        totalWrittenInLevel += rewriter.currentWriter().getOnDiskFilePointer() - posBefore;
+        long posBefore = sstableWriter.currentWriter().getOnDiskFilePointer();
+        RowIndexEntry rie = sstableWriter.append(row);
+        totalWrittenInLevel += sstableWriter.currentWriter().getOnDiskFilePointer() - posBefore;
         partitionsWritten++;
-        if (rewriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
+        if (sstableWriter.currentWriter().getOnDiskFilePointer() > maxSSTableSize)
         {
             if (totalWrittenInLevel > LeveledManifest.maxBytesForLevel(currentLevel, maxSSTableSize))
             {
@@ -98,23 +96,11 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
                                                         cfs.metadata,
                                                         cfs.partitioner,
                                                         new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors));
-            rewriter.switchWriter(writer);
+            sstableWriter.switchWriter(writer);
             partitionsWritten = 0;
             sstablesWritten++;
         }
         return rie != null;
 
     }
-
-    @Override
-    public void abort()
-    {
-        rewriter.abort();
-    }
-
-    @Override
-    public List<SSTableReader> finish()
-    {
-        return rewriter.finish();
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 1a99059..ab24bf8 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -36,7 +36,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
 {
     private final long estimatedTotalKeys;
     private final long expectedWriteSize;
-    private final SSTableRewriter sstableWriter;
     private final long maxSSTableSize;
     private final int level;
     private final long estimatedSSTables;
@@ -44,7 +43,7 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
 
     public MaxSSTableSizeWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
     {
-        super(cfs, nonExpiredSSTables);
+        super(cfs, allSSTables, nonExpiredSSTables, offline);
         this.allSSTables = allSSTables;
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
@@ -52,7 +51,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         expectedWriteSize = Math.min(maxSSTableSize, totalSize);
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
-        sstableWriter = new SSTableRewriter(cfs, allSSTables, CompactionTask.getMaxDataAge(nonExpiredSSTables), offline);
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
                                                     estimatedTotalKeys / estimatedSSTables,
@@ -83,18 +81,6 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     }
 
     @Override
-    public void abort()
-    {
-        sstableWriter.abort();
-    }
-
-    @Override
-    public List<SSTableReader> finish()
-    {
-        return sstableWriter.finish();
-    }
-
-    @Override
     public long estimatedKeys()
     {
         return estimatedTotalKeys;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
index c97270c..2a452c7 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/SplittingSizeTieredCompactionWriter.java
@@ -48,7 +48,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
 
     public static final long DEFAULT_SMALLEST_SSTABLE_BYTES = 50_000_000;
     private final double[] ratios;
-    private final SSTableRewriter sstableWriter;
     private final long totalSize;
     private final Set<SSTableReader> allSSTables;
     private long currentBytesToWrite;
@@ -61,7 +60,7 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
 
     public SplittingSizeTieredCompactionWriter(ColumnFamilyStore cfs, Set<SSTableReader> allSSTables, Set<SSTableReader> nonExpiredSSTables, OperationType compactionType, long smallestSSTable)
     {
-        super(cfs, nonExpiredSSTables);
+        super(cfs, allSSTables, nonExpiredSSTables, false);
         this.allSSTables = allSSTables;
         totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
         double[] potentialRatios = new double[20];
@@ -83,7 +82,6 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
             }
         }
         ratios = Arrays.copyOfRange(potentialRatios, 0, noPointIndex);
-        sstableWriter = new SSTableRewriter(cfs, allSSTables, CompactionTask.getMaxDataAge(nonExpiredSSTables), false);
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(Math.round(totalSize * ratios[currentRatioIndex])));
         long currentPartitionsToWrite = Math.round(estimatedTotalKeys * ratios[currentRatioIndex]);
         currentBytesToWrite = Math.round(totalSize * ratios[currentRatioIndex]);
@@ -119,17 +117,4 @@ public class SplittingSizeTieredCompactionWriter extends CompactionAwareWriter
         }
         return rie != null;
     }
-
-
-    @Override
-    public void abort()
-    {
-        sstableWriter.abort();
-    }
-
-    @Override
-    public List<SSTableReader> finish()
-    {
-        return sstableWriter.finish();
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
index eb9dcf8..6218526 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java
@@ -28,17 +28,12 @@ import java.util.zip.Adler32;
 import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
-import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 import org.apache.cassandra.io.util.DataIntegrityMetadata;
 import org.apache.cassandra.io.util.FileMark;
 import org.apache.cassandra.io.util.SequentialWriter;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.FINAL;
-import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED;
-import static org.apache.cassandra.io.compress.CompressionMetadata.Writer.OpenType.SHARED_FINAL;
-
 public class CompressedSequentialWriter extends SequentialWriter
 {
     private final DataIntegrityMetadata.ChecksumWriter crcMetadata;
@@ -97,12 +92,6 @@ public class CompressedSequentialWriter extends SequentialWriter
     }
 
     @Override
-    public void sync()
-    {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
     public void flush()
     {
         throw new UnsupportedOperationException();
@@ -163,13 +152,11 @@ public class CompressedSequentialWriter extends SequentialWriter
             runPostFlush.run();
     }
 
-    public CompressionMetadata open(long overrideLength, boolean isFinal)
+    public CompressionMetadata open(long overrideLength)
     {
         if (overrideLength <= 0)
-            return metadataWriter.open(uncompressedSize, chunkOffset, isFinal ? FINAL : SHARED_FINAL);
-        // we are early opening the file, make sure we open metadata with the correct size
-        assert !isFinal;
-        return metadataWriter.open(overrideLength, chunkOffset, SHARED);
+            overrideLength = uncompressedSize;
+        return metadataWriter.open(overrideLength, chunkOffset);
     }
 
     @Override
@@ -279,36 +266,36 @@ public class CompressedSequentialWriter extends SequentialWriter
         }
     }
 
-    @Override
-    public void close()
+    protected class TransactionalProxy extends SequentialWriter.TransactionalProxy
     {
-        if (buffer == null)
-            return;
-
-        long finalPosition = current();
-
-        super.close();
-        sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
-        try
+        @Override
+        protected Throwable doCommit(Throwable accumulate)
         {
-            metadataWriter.close(finalPosition, chunkCount);
+            return metadataWriter.commit(accumulate);
         }
-        catch (IOException e)
+
+        @Override
+        protected Throwable doAbort(Throwable accumulate)
         {
-            throw new FSWriteError(e, getPath());
+            return super.doAbort(metadataWriter.abort(accumulate));
         }
-    }
 
-    public void abort()
-    {
-        super.abort();
-        metadataWriter.abort();
+        @Override
+        protected void doPrepare()
+        {
+            syncInternal();
+            if (descriptor != null)
+                crcMetadata.writeFullChecksum(descriptor);
+            releaseFileHandle();
+            sstableMetadataCollector.addCompressionRatio(compressedSize, uncompressedSize);
+            metadataWriter.finalizeLength(current(), chunkCount).prepareToCommit();
+        }
     }
 
     @Override
-    public void writeFullChecksum(Descriptor descriptor)
+    protected SequentialWriter.TransactionalProxy txnProxy()
     {
-        crcMetadata.writeFullChecksum(descriptor);
+        return new TransactionalProxy();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
index 928541a..a6c7a8b 100644
--- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
+++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java
@@ -36,9 +36,9 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
 import com.google.common.primitives.Longs;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSReadError;
@@ -47,12 +47,12 @@ import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.Memory;
 import org.apache.cassandra.io.util.SafeMemory;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.Transactional;
 
 /**
  * Holds metadata about compressed file
@@ -265,7 +265,7 @@ public class CompressionMetadata
         chunkOffsets.close();
     }
 
-    public static class Writer
+    public static class Writer extends Transactional.AbstractTransactional implements Transactional
     {
         // path to the file
         private final CompressionParameters parameters;
@@ -274,6 +274,8 @@ public class CompressionMetadata
         private SafeMemory offsets = new SafeMemory(maxCount * 8L);
         private int count = 0;
 
+        // provided by user when setDescriptor
+        private long dataLength, chunkCount;
 
         private Writer(CompressionParameters parameters, String path)
         {
@@ -321,61 +323,60 @@ public class CompressionMetadata
             }
         }
 
-        static enum OpenType
+        // we've written everything; wire up some final metadata state
+        public Writer finalizeLength(long dataLength, int chunkCount)
         {
-            // i.e. FinishType == EARLY; we will use the RefCountedMemory in possibly multiple instances
-            SHARED,
-            // i.e. FinishType == EARLY, but the sstable has been completely written, so we can
-            // finalise the contents and size of the memory, but must retain a reference to it
-            SHARED_FINAL,
-            // i.e. FinishType == NORMAL or FINISH_EARLY, i.e. we have actually finished writing the table
-            // and will never need to open the metadata again, so we can release any references to it here
-            FINAL
+            this.dataLength = dataLength;
+            this.chunkCount = chunkCount;
+            return this;
         }
 
-        public CompressionMetadata open(long dataLength, long compressedLength, OpenType type)
+        public void doPrepare()
         {
-            SafeMemory offsets;
-            int count = this.count;
-            switch (type)
+            assert chunkCount == count;
+
+            // finalize the size of memory used if it won't now change;
+            // unnecessary if already correct size
+            if (offsets.size() != count * 8L)
+            {
+                SafeMemory tmp = offsets;
+                offsets = offsets.copy(count * 8L);
+                tmp.free();
+            }
+
+            // flush the data to disk
+            DataOutputStream out = null;
+            try
+            {
+                out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
+                writeHeader(out, dataLength, count);
+                for (int i = 0 ; i < count ; i++)
+                    out.writeLong(offsets.getLong(i * 8L));
+            }
+            catch (IOException e)
             {
-                case FINAL: case SHARED_FINAL:
-                    if (this.offsets.size() != count * 8L)
-                    {
-                        // finalize the size of memory used if it won't now change;
-                        // unnecessary if already correct size
-                        SafeMemory tmp = this.offsets.copy(count * 8L);
-                        this.offsets.free();
-                        this.offsets = tmp;
-                    }
-
-                    if (type == OpenType.SHARED_FINAL)
-                    {
-                        offsets = this.offsets.sharedCopy();
-                    }
-                    else
-                    {
-                        offsets = this.offsets;
-                        // null out our reference to the original shared data to catch accidental reuse
-                        // note that since noone is writing to this Writer while we open it, null:ing out this.offsets is safe
-                        this.offsets = null;
-                    }
-                    break;
-
-                case SHARED:
-                    offsets = this.offsets.sharedCopy();
-                    // we should only be opened on a compression data boundary; truncate our size to this boundary
-                    count = (int) (dataLength / parameters.chunkLength());
-                    if (dataLength % parameters.chunkLength() != 0)
-                        count++;
-                    // grab our actual compressed length from the next offset from our the position we're opened to
-                    if (count < this.count)
-                        compressedLength = offsets.getLong(count * 8L);
-                    break;
-
-                default:
-                    throw new AssertionError();
+                throw Throwables.propagate(e);
             }
+            finally
+            {
+                FileUtils.closeQuietly(out);
+            }
+        }
+
+        public CompressionMetadata open(long dataLength, long compressedLength)
+        {
+            SafeMemory offsets = this.offsets.sharedCopy();
+
+            // calculate how many entries we need, if our dataLength is truncated
+            int count = (int) (dataLength / parameters.chunkLength());
+            if (dataLength % parameters.chunkLength() != 0)
+                count++;
+
+            assert count > 0;
+            // grab our actual compressed length from the next offset from our the position we're opened to
+            if (count < this.count)
+                compressedLength = offsets.getLong(count * 8L);
+
             return new CompressionMetadata(filePath, parameters, offsets, count * 8L, dataLength, compressedLength);
         }
 
@@ -402,27 +403,19 @@ public class CompressionMetadata
             count = chunkIndex;
         }
 
-        public void close(long dataLength, int chunks) throws IOException
+        protected Throwable doCleanup(Throwable failed)
         {
-            DataOutputStream out = null;
-            try
-            {
-            	out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(filePath)));
-	            assert chunks == count;
-	            writeHeader(out, dataLength, chunks);
-                for (int i = 0 ; i < count ; i++)
-                    out.writeLong(offsets.getLong(i * 8L));
-            }
-            finally
-            {
-                FileUtils.closeQuietly(out);
-            }
+            return offsets.close(failed);
         }
 
-        public void abort()
+        protected Throwable doCommit(Throwable accumulate)
         {
-            if (offsets != null)
-                offsets.close();
+            return accumulate;
+        }
+
+        protected Throwable doAbort(Throwable accumulate)
+        {
+            return FileUtils.deleteWithConfirm(filePath, false, accumulate);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
index fbefe13..59c5eef 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummary.java
@@ -18,16 +18,20 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.DataInputStream;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import org.apache.cassandra.io.util.Memory;
-import org.apache.cassandra.io.util.MemoryOutputStream;
+import org.apache.cassandra.io.util.*;
+import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.WrappedSharedCloseable;
 import org.apache.cassandra.utils.memory.MemoryUtil;
@@ -46,6 +50,7 @@ import static org.apache.cassandra.io.sstable.Downsampling.BASE_SAMPLING_LEVEL;
  */
 public class IndexSummary extends WrappedSharedCloseable
 {
+    private static final Logger logger = LoggerFactory.getLogger(IndexSummary.class);
     public static final IndexSummarySerializer serializer = new IndexSummarySerializer();
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
index c7c51e5..12e41c8 100644
--- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
+++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryBuilder.java
@@ -203,12 +203,16 @@ public class IndexSummaryBuilder implements AutoCloseable
         }
     }
 
-    public IndexSummary build(IPartitioner partitioner)
+    public void prepareToCommit()
     {
         // this method should only be called when we've finished appending records, so we truncate the
         // memory we're using to the exact amount required to represent it before building our summary
         entries.setCapacity(entries.length());
         offsets.setCapacity(offsets.length());
+    }
+
+    public IndexSummary build(IPartitioner partitioner)
+    {
         return build(partitioner, null);
     }
 
@@ -240,6 +244,13 @@ public class IndexSummaryBuilder implements AutoCloseable
         offsets.close();
     }
 
+    public Throwable close(Throwable accumulate)
+    {
+        accumulate = entries.close(accumulate);
+        accumulate = offsets.close(accumulate);
+        return accumulate;
+    }
+
     public static int entriesAtSamplingLevel(int samplingLevel, int maxSummarySize)
     {
         return (int) Math.ceil((samplingLevel * maxSummarySize) / (double) BASE_SAMPLING_LEVEL);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTable.java b/src/java/org/apache/cassandra/io/sstable/SSTable.java
index f486b78..bc3486a 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTable.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTable.java
@@ -195,7 +195,7 @@ public abstract class SSTable
         }
     }
 
-    private static Set<Component> discoverComponentsFor(Descriptor desc)
+    public static Set<Component> discoverComponentsFor(Descriptor desc)
     {
         Set<Component.Type> knownTypes = Sets.difference(Component.TYPES, Collections.singleton(Component.Type.CUSTOM));
         Set<Component> components = Sets.newHashSetWithExpectedSize(knownTypes.size());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 8890659..a526ec9 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.io.sstable;
 import java.util.*;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
@@ -33,6 +32,8 @@ import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.concurrent.Refs;
+import org.apache.cassandra.utils.concurrent.Transactional;
 
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -51,7 +52,7 @@ import static org.apache.cassandra.utils.Throwables.merge;
  * but leave any hard-links in place for the readers we opened to cleanup when they're finished as we would had we finished
  * successfully.
  */
-public class SSTableRewriter
+public class SSTableRewriter extends Transactional.AbstractTransactional implements Transactional
 {
     private static long preemptiveOpenInterval;
     static
@@ -77,7 +78,9 @@ public class SSTableRewriter
     private final ColumnFamilyStore cfs;
 
     private final long maxAge;
-    private final List<SSTableReader> finished = new ArrayList<>();
+    private long repairedAt = -1;
+    // the set of final readers we will expose on commit
+    private final List<SSTableReader> preparedForCommit = new ArrayList<>();
     private final Set<SSTableReader> rewriting; // the readers we are rewriting (updated as they are replaced)
     private final Map<Descriptor, DecoratedKey> originalStarts = new HashMap<>(); // the start key for each reader we are rewriting
     private final Map<Descriptor, Integer> fileDescriptors = new HashMap<>(); // the file descriptors for each reader descriptor we are rewriting
@@ -85,21 +88,18 @@ public class SSTableRewriter
     private SSTableReader currentlyOpenedEarly; // the reader for the most recent (re)opening of the target file
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target file we last (re)opened at
 
-    private final List<SSTableReader> finishedReaders = new ArrayList<>();
-    private final Queue<Finished> finishedEarly = new ArrayDeque<>();
-    // as writers are closed from finishedEarly, their last readers are moved
-    // into discard, so that abort can cleanup after us safely
-    private final List<SSTableReader> discard = new ArrayList<>();
-    private final boolean isOffline; // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+    private final List<Finished> finishedWriters = new ArrayList<>();
+    // as writers are closed from finishedWriters, their last readers are moved into discard, so that abort can cleanup
+    // after us safely; we use a set so we can add in both prepareToCommit and abort
+    private final Set<SSTableReader> discard = new HashSet<>();
+    // true for operations that are performed without Cassandra running (prevents updates of DataTracker)
+    private final boolean isOffline;
 
     private SSTableWriter writer;
     private Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<>();
-    private State state = State.WORKING;
 
-    private static enum State
-    {
-        WORKING, FINISHED, ABORTED
-    }
+    // for testing (TODO: remove when have byteman setup)
+    private boolean throwEarly, throwLate;
 
     public SSTableRewriter(ColumnFamilyStore cfs, Set<SSTableReader> rewriting, long maxAge, boolean isOffline)
     {
@@ -178,7 +178,7 @@ public class SSTableRewriter
             }
             else
             {
-                SSTableReader reader = writer.openEarly(maxAge);
+                SSTableReader reader = writer.setMaxDataAge(maxAge).openEarly();
                 if (reader != null)
                 {
                     replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
@@ -190,29 +190,19 @@ public class SSTableRewriter
         }
     }
 
-    public void abort()
+    protected Throwable doAbort(Throwable accumulate)
     {
-        switch (state)
-        {
-            case ABORTED:
-                return;
-            case FINISHED:
-                throw new IllegalStateException("Cannot abort - changes have already been committed");
-        }
-        state = State.ABORTED;
-
-        Throwable fail = null;
         try
         {
             moveStarts(null, null, true);
         }
         catch (Throwable t)
         {
-            fail = merge(fail, t);
+            accumulate = merge(accumulate, t);
         }
 
-        // remove already completed SSTables
-        for (SSTableReader sstable : finished)
+        // cleanup any sstables we prepared for commit
+        for (SSTableReader sstable : preparedForCommit)
         {
             try
             {
@@ -221,50 +211,41 @@ public class SSTableRewriter
             }
             catch (Throwable t)
             {
-                fail = merge(fail, t);
+                accumulate = merge(accumulate , t);
             }
         }
 
+        // abort the writers, and add the early opened readers to our discard pile
+
         if (writer != null)
-            finishedEarly.add(new Finished(writer, currentlyOpenedEarly));
+            finishedWriters.add(new Finished(writer, currentlyOpenedEarly));
 
-        // abort the writers
-        for (Finished finished : finishedEarly)
+        for (Finished finished : finishedWriters)
         {
-            try
-            {
-                finished.writer.abort();
-            }
-            catch (Throwable t)
-            {
-                fail = merge(fail, t);
-            }
-            try
-            {
-                if (finished.reader != null)
-                {
-                    // if we've already been opened, add ourselves to the discard pile
-                    discard.add(finished.reader);
-                    finished.reader.markObsolete();
-                }
-            }
-            catch (Throwable t)
-            {
-                fail = merge(fail, t);
-            }
-        }
+            accumulate = finished.writer.abort(accumulate);
 
-        try
-        {
-            replaceWithFinishedReaders(Collections.<SSTableReader>emptyList());
-        }
-        catch (Throwable t)
-        {
-            fail = merge(fail, t);
+            // if we've already been opened, add ourselves to the discard pile
+            if (finished.reader != null)
+                discard.add(finished.reader);
         }
 
-        if (fail != null)
-            throw Throwables.propagate(fail);
+        accumulate = replaceWithFinishedReaders(Collections.<SSTableReader>emptyList(), accumulate);
+        return accumulate;
+    }
+
+    protected Throwable doCommit(Throwable accumulate)
+    {
+        for (Finished f : finishedWriters)
+            accumulate = f.writer.commit(accumulate);
+        accumulate = replaceWithFinishedReaders(preparedForCommit, accumulate);
+
+        return accumulate;
+    }
+
+    protected Throwable doCleanup(Throwable accumulate)
+    {
+        // we have no state of our own to cleanup; Transactional objects cleanup their own state in abort or commit
+        return accumulate;
     }
 
     /**
@@ -369,41 +350,38 @@ public class SSTableRewriter
 
     public void switchWriter(SSTableWriter newWriter)
     {
-        if (writer == null)
+        if (writer == null || writer.getFilePointer() == 0)
         {
+            if (writer != null)
+                writer.abort();
             writer = newWriter;
             return;
         }
 
-        if (writer.getFilePointer() != 0)
-        {
-            // If early re-open is disabled, simply finalize the writer and store it
-            if (preemptiveOpenInterval == Long.MAX_VALUE)
-            {
-                SSTableReader reader = writer.finish(SSTableWriter.FinishType.NORMAL, maxAge, -1);
-                finishedReaders.add(reader);
-            }
-            else
-            {
-                // we leave it as a tmp file, but we open it and add it to the dataTracker
-                SSTableReader reader = writer.finish(SSTableWriter.FinishType.EARLY, maxAge, -1);
-                replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
-                moveStarts(reader, reader.last, false);
-                finishedEarly.add(new Finished(writer, reader));
-            }
-        }
-        else
+        SSTableReader reader = null;
+        if (preemptiveOpenInterval != Long.MAX_VALUE)
         {
-            writer.abort();
+            // we leave it as a tmp file, but we open it and add it to the dataTracker
+            reader = writer.setMaxDataAge(maxAge).openFinalEarly();
+            replaceEarlyOpenedFile(currentlyOpenedEarly, reader);
+            moveStarts(reader, reader.last, false);
         }
+        finishedWriters.add(new Finished(writer, reader));
+
         currentlyOpenedEarly = null;
         currentlyOpenedEarlyAt = 0;
         writer = newWriter;
     }
 
-    public List<SSTableReader> finish()
+    /**
+     * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
+     *                   the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
+     *                   repair time.
+     */
+    public SSTableRewriter setRepairedAt(long repairedAt)
     {
-        return finish(-1);
+        this.repairedAt = repairedAt;
+        return this;
     }
 
     /**
@@ -417,94 +395,92 @@ public class SSTableRewriter
      * gymnastics (ie, call DataTracker#markCompactedSSTablesReplaced(..))
      *
      *
-     * @param repairedAt the repair time, -1 if we should use the time we supplied when we created
-     *                   the SSTableWriter (and called rewriter.switchWriter(..)), actual time if we want to override the
-     *                   repair time.
      */
-    public List<SSTableReader> finish(long repairedAt)
+    public List<SSTableReader> finish()
     {
-        return finishAndMaybeThrow(repairedAt, false, false);
+        super.finish();
+        return finished();
     }
 
-    @VisibleForTesting
-    void finishAndThrow(boolean throwEarly)
+    public List<SSTableReader> finished()
     {
-        finishAndMaybeThrow(-1, throwEarly, !throwEarly);
+        assert state() == State.COMMITTED || state() == State.READY_TO_COMMIT;
+        return preparedForCommit;
     }
 
-    private List<SSTableReader> finishAndMaybeThrow(long repairedAt, boolean throwEarly, boolean throwLate)
+    protected void doPrepare()
     {
-        switch (state)
-        {
-            case FINISHED: case ABORTED:
-                throw new IllegalStateException("Cannot finish - changes have already been " + state.toString().toLowerCase());
-        }
-
-        List<SSTableReader> newReaders = new ArrayList<>();
         switchWriter(null);
 
         if (throwEarly)
             throw new RuntimeException("exception thrown early in finish, for testing");
 
         // No early open to finalize and replace
-        if (preemptiveOpenInterval == Long.MAX_VALUE)
-        {
-            replaceWithFinishedReaders(finishedReaders);
-            if (throwLate)
-                throw new RuntimeException("exception thrown after all sstables finished, for testing");
-            return finishedReaders;
-        }
-
-        while (!finishedEarly.isEmpty())
+        for (Finished f : finishedWriters)
         {
-            Finished f = finishedEarly.peek();
-            if (f.writer.getFilePointer() > 0)
-            {
-                if (f.reader != null)
-                    discard.add(f.reader);
+            if (f.reader != null)
+                discard.add(f.reader);
 
-                SSTableReader newReader = f.writer.finish(SSTableWriter.FinishType.FINISH_EARLY, maxAge, repairedAt);
+            f.writer.setRepairedAt(repairedAt).setMaxDataAge(maxAge).setOpenResult(true).prepareToCommit();
+            SSTableReader newReader = f.writer.finished();
 
-                if (f.reader != null)
-                    f.reader.setReplacedBy(newReader);
+            if (f.reader != null)
+                f.reader.setReplacedBy(newReader);
 
-                finished.add(newReader);
-                newReaders.add(newReader);
-            }
-            else
-            {
-                f.writer.abort();
-                assert f.reader == null;
-            }
-            finishedEarly.poll();
+            preparedForCommit.add(newReader);
         }
 
         if (throwLate)
             throw new RuntimeException("exception thrown after all sstables finished, for testing");
+    }
 
-        replaceWithFinishedReaders(newReaders);
-        state = State.FINISHED;
-        return finished;
+    @VisibleForTesting
+    void throwDuringPrepare(boolean throwEarly)
+    {
+        this.throwEarly = throwEarly;
+        this.throwLate = !throwEarly;
     }
 
     // cleanup all our temporary readers and swap in our new ones
-    private void replaceWithFinishedReaders(List<SSTableReader> finished)
+    private Throwable replaceWithFinishedReaders(List<SSTableReader> finished, Throwable accumulate)
     {
         if (isOffline)
         {
             for (SSTableReader reader : discard)
             {
-                if (reader.getCurrentReplacement() == reader)
-                    reader.markObsolete();
-                reader.selfRef().release();
+                try
+                {
+                    if (reader.getCurrentReplacement() == reader)
+                        reader.markObsolete();
+                }
+                catch (Throwable t)
+                {
+                    accumulate = merge(accumulate, t);
+                }
             }
+            accumulate = Refs.release(Refs.selfRefs(discard), accumulate);
         }
         else
         {
-            dataTracker.replaceEarlyOpenedFiles(discard, finished);
-            dataTracker.unmarkCompacting(discard);
+            try
+            {
+                dataTracker.replaceEarlyOpenedFiles(discard, finished);
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
+            try
+            {
+                dataTracker.unmarkCompacting(discard);
+            }
+            catch (Throwable t)
+            {
+                accumulate = merge(accumulate, t);
+            }
         }
         discard.clear();
+        return accumulate;
     }
 
     private static final class Finished

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
index 5998044..d6ab940 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleUnsortedWriter.java
@@ -157,6 +157,7 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
         {
             throw new RuntimeException(e);
         }
+        checkForWriterException();
     }
 
     // This is overridden by CQLSSTableWriter to hold off replacing column family until the next iteration through
@@ -215,39 +216,40 @@ public class SSTableSimpleUnsortedWriter extends AbstractSSTableSimpleWriter
 
         public void run()
         {
-            SSTableWriter writer = null;
-
-            while (true)
             {
-                try
+                while (true)
                 {
-                    Buffer b = writeQueue.take();
-                    if (b == SENTINEL)
-                        return;
-
-                    writer = getWriter();
-                    boolean first = true;
-                    for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet())
+                    try
                     {
-                        if (entry.getValue().getColumnCount() > 0)
-                            writer.append(entry.getKey(), entry.getValue());
-                        else if (!first)
-                            throw new AssertionError("Empty partition");
-                        first = false;
+                        Buffer b = writeQueue.take();
+                        if (b == SENTINEL)
+                            return;
+
+                        try (SSTableWriter writer = getWriter();)
+                        {
+                            boolean first = true;
+                            for (Map.Entry<DecoratedKey, ColumnFamily> entry : b.entrySet())
+                            {
+                                if (entry.getValue().getColumnCount() > 0)
+                                    writer.append(entry.getKey(), entry.getValue());
+                                else if (!first)
+                                    throw new AssertionError("Empty partition");
+                                first = false;
+                            }
+
+                            writer.finish(false);
+                        }
+                    }
+                    catch (Throwable e)
+                    {
+                        JVMStabilityInspector.inspectThrowable(e);
+                        // Keep only the first exception
+                        if (exception == null)
+                            exception = e;
                     }
-                    writer.close();
-                }
-                catch (Throwable e)
-                {
-                    JVMStabilityInspector.inspectThrowable(e);
-                    if (writer != null)
-                        writer.abort();
-                    // Keep only the first exception
-                    if (exception == null)
-                      exception = e;
                 }
-            }
 
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
index 3417d68..f206969 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableSimpleWriter.java
@@ -19,6 +19,8 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 
+import com.google.common.base.Throwables;
+
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
@@ -72,12 +74,11 @@ public class SSTableSimpleWriter extends AbstractSSTableSimpleWriter
         {
             if (currentKey != null)
                 writeRow(currentKey, columnFamily);
-            writer.close();
+            writer.finish(false);
         }
-        catch (FSError e)
+        catch (Throwable t)
         {
-            writer.abort();
-            throw e;
+            throw Throwables.propagate(writer.abort(t));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
index 4411ca7..23c27b0 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java
@@ -864,12 +864,21 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS
      * @param ibuilder
      * @param dbuilder
      */
+
     public void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
     {
-        saveSummary(ibuilder, dbuilder, indexSummary);
+        saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, indexSummary);
     }
 
-    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
+    private void saveSummary(SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary newSummary)
+    {
+        saveSummary(this.descriptor, this.first, this.last, ibuilder, dbuilder, newSummary);
+    }
+    /**
+     * Save index summary to Summary.db file.
+     */
+    public static void saveSummary(Descriptor descriptor, DecoratedKey first, DecoratedKey last,
+                                   SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder, IndexSummary summary)
     {
         File summariesFile = new File(descriptor.filenameFor(Component.SUMMARY));
         if (summariesFile.exists())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/8704006b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
index baacb5a..f99292e 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java
@@ -31,16 +31,17 @@ import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataComponent;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
 import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.utils.concurrent.Transactional;
 
 import java.io.DataInput;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -49,30 +50,24 @@ import java.util.Set;
  * TableWriter.create() is the primary way to create a writer for a particular format.
  * The format information is part of the Descriptor.
  */
-public abstract class SSTableWriter extends SSTable
+public abstract class SSTableWriter extends SSTable implements Transactional
 {
-    private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class);
-
-    public static enum FinishType
-    {
-        CLOSE(null, true),
-        NORMAL(SSTableReader.OpenReason.NORMAL, true),
-        EARLY(SSTableReader.OpenReason.EARLY, false), // no renaming
-        FINISH_EARLY(SSTableReader.OpenReason.NORMAL, true); // tidy up an EARLY finish
-        public final SSTableReader.OpenReason openReason;
-
-        public final boolean isFinal;
-        FinishType(SSTableReader.OpenReason openReason, boolean isFinal)
-        {
-            this.openReason = openReason;
-            this.isFinal = isFinal;
-        }
-    }
-
-    protected final long repairedAt;
+    protected long repairedAt;
+    protected long maxDataAge = -1;
     protected final long keyCount;
     protected final MetadataCollector metadataCollector;
     protected final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
+    protected final TransactionalProxy txnProxy = txnProxy();
+
+    protected abstract TransactionalProxy txnProxy();
+
+    // due to lack of multiple inheritance, we use an inner class to proxy our Transactional implementation details
+    protected abstract class TransactionalProxy extends AbstractTransactional
+    {
+        // should be set during doPrepare()
+        protected SSTableReader finalReader;
+        protected boolean openResult;
+    }
 
     protected SSTableWriter(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector)
     {
@@ -164,28 +159,98 @@ public abstract class SSTableWriter extends SSTable
 
     public abstract long getOnDiskFilePointer();
 
-    public abstract void isolateReferences();
-
     public abstract void resetAndTruncate();
 
-    public SSTableReader closeAndOpenReader()
+    public SSTableWriter setRepairedAt(long repairedAt)
+    {
+        if (repairedAt > 0)
+            this.repairedAt = repairedAt;
+        return this;
+    }
+
+    public SSTableWriter setMaxDataAge(long maxDataAge)
+    {
+        this.maxDataAge = maxDataAge;
+        return this;
+    }
+
+    public SSTableWriter setOpenResult(boolean openResult)
+    {
+        txnProxy.openResult = openResult;
+        return this;
+    }
+
+    /**
+     * Open the resultant SSTableReader before it has been fully written
+     */
+    public abstract SSTableReader openEarly();
+
+    /**
+     * Open the resultant SSTableReader once it has been fully written, but before the
+     * _set_ of tables that are being written together as one atomic operation are all ready
+     */
+    public abstract SSTableReader openFinalEarly();
+
+    public SSTableReader finish(long repairedAt, long maxDataAge, boolean openResult)
     {
-        return closeAndOpenReader(System.currentTimeMillis());
+        if (repairedAt > 0)
+            this.repairedAt = repairedAt;
+        this.maxDataAge = maxDataAge;
+        return finish(openResult);
     }
 
-    public SSTableReader closeAndOpenReader(long maxDataAge)
+    public SSTableReader finish(boolean openResult)
     {
-        return finish(FinishType.NORMAL, maxDataAge, repairedAt);
+        txnProxy.openResult = openResult;
+        txnProxy.finish();
+        return finished();
     }
 
-    public abstract SSTableReader finish(FinishType finishType, long maxDataAge, long repairedAt);
+    /**
+     * Open the resultant SSTableReader once it has been fully written, and all related state
+     * is ready to be finalised including other sstables being written involved in the same operation
+     */
+    public SSTableReader finished()
+    {
+        return txnProxy.finalReader;
+    }
+
+    // finalise our state on disk, including renaming
+    public final void prepareToCommit()
+    {
+        txnProxy.prepareToCommit();
+    }
+
+    public final Throwable commit(Throwable accumulate)
+    {
+        return txnProxy.commit(accumulate);
+    }
+
+    public final Throwable abort(Throwable accumulate)
+    {
+        return txnProxy.abort(accumulate);
+    }
 
-    public abstract SSTableReader openEarly(long maxDataAge);
+    public final void close()
+    {
+        txnProxy.close();
+    }
 
-    // Close the writer and return the descriptor to the new sstable and it's metadata
-    public abstract Pair<Descriptor, StatsMetadata> close();
+    public final void abort()
+    {
+        txnProxy.abort();
+    }
 
+    protected Map<MetadataType, MetadataComponent> finalizeMetadata()
+    {
+        return metadataCollector.finalizeMetadata(partitioner.getClass().getCanonicalName(),
+                                                  metadata.getBloomFilterFpChance(), repairedAt);
+    }
 
+    protected StatsMetadata statsMetadata()
+    {
+        return (StatsMetadata) finalizeMetadata().get(MetadataType.STATS);
+    }
 
     public static Descriptor rename(Descriptor tmpdesc, Set<Component> components)
     {
@@ -209,12 +274,6 @@ public abstract class SSTableWriter extends SSTable
     }
 
 
-    /**
-     * After failure, attempt to close the index writer and data file before deleting all temp components for the sstable
-     */
-    public abstract void abort();
-
-
     public static abstract class Factory
     {
         public abstract SSTableWriter open(Descriptor descriptor, long keyCount, long repairedAt, CFMetaData metadata, IPartitioner partitioner, MetadataCollector metadataCollector);