You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/24 18:32:14 UTC

[5/5] cassandra git commit: Introduce safer durable sstable membership management (and simplify cleanup of compaction leftovers)

Introduce safer durable sstable membership management
(and simplify cleanup of compaction leftovers)

Instead of using temporary files and system tables,
this patch introduces a simple transaction log for sstable
membership edits that can be committed/aborted atomically
and simply replayed on startup.

patch by stefania; reviewed by benedict for CASSANDRA-7066


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b09e60f7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b09e60f7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b09e60f7

Branch: refs/heads/trunk
Commit: b09e60f72bb2f37235d9e9190c25db36371b3c18
Parents: e338d2f
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Mon Apr 27 14:38:53 2015 +0800
Committer: Benedict Elliott Smith <be...@apache.org>
Committed: Fri Jul 24 14:41:51 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   4 +
 bin/sstablelister                               |  55 ++
 bin/sstablelister.bat                           |  41 +
 build.xml                                       |   4 +-
 .../org/apache/cassandra/config/Config.java     |   2 +-
 .../cassandra/config/DatabaseDescriptor.java    |   2 +-
 .../apache/cassandra/db/ColumnFamilyStore.java  | 149 +---
 .../org/apache/cassandra/db/Directories.java    |  57 +-
 src/java/org/apache/cassandra/db/Memtable.java  |  31 +-
 .../org/apache/cassandra/db/SystemKeyspace.java | 126 +--
 .../db/compaction/CompactionController.java     |   2 +-
 .../db/compaction/CompactionManager.java        |  22 +-
 .../cassandra/db/compaction/CompactionTask.java |  24 +-
 .../db/compaction/LeveledCompactionTask.java    |   8 +-
 .../cassandra/db/compaction/OperationType.java  |  18 +-
 .../db/compaction/SSTableSplitter.java          |   6 +-
 .../cassandra/db/compaction/Scrubber.java       |  18 +-
 .../SizeTieredCompactionStrategy.java           |   6 +-
 .../cassandra/db/compaction/Upgrader.java       |  13 +-
 .../writers/CompactionAwareWriter.java          |  10 +-
 .../writers/DefaultCompactionWriter.java        |  11 +-
 .../writers/MajorLeveledCompactionWriter.java   |  15 +-
 .../writers/MaxSSTableSizeWriter.java           |  25 +-
 .../SplittingSizeTieredCompactionWriter.java    |  39 +-
 .../apache/cassandra/db/lifecycle/Helpers.java  |  58 +-
 .../db/lifecycle/LifecycleTransaction.java      | 150 +++-
 .../apache/cassandra/db/lifecycle/Tracker.java  |  50 +-
 .../cassandra/db/lifecycle/TransactionLogs.java | 786 +++++++++++++++++++
 .../io/sstable/AbstractSSTableSimpleWriter.java |   8 +-
 .../apache/cassandra/io/sstable/Descriptor.java | 107 ++-
 .../io/sstable/IndexSummaryManager.java         |   7 +-
 .../apache/cassandra/io/sstable/SSTable.java    |  17 +-
 .../io/sstable/SSTableDeletingTask.java         | 130 ---
 .../cassandra/io/sstable/SSTableLoader.java     |  22 +-
 .../cassandra/io/sstable/SSTableRewriter.java   |  20 +-
 .../io/sstable/SSTableSimpleUnsortedWriter.java |   7 +-
 .../io/sstable/SSTableSimpleWriter.java         |   5 +-
 .../cassandra/io/sstable/SSTableTxnWriter.java  | 101 +++
 .../io/sstable/format/SSTableReader.java        | 171 ++--
 .../io/sstable/format/SSTableWriter.java        |  50 +-
 .../io/sstable/format/big/BigFormat.java        |  12 +-
 .../io/sstable/format/big/BigTableWriter.java   |  69 +-
 .../io/sstable/metadata/MetadataSerializer.java |   7 +-
 .../org/apache/cassandra/io/util/FileUtils.java |  65 ++
 .../cassandra/io/util/SequentialWriter.java     |  12 +-
 .../cassandra/service/CassandraDaemon.java      |  16 +-
 .../apache/cassandra/service/GCInspector.java   |  10 +-
 .../apache/cassandra/service/StartupChecks.java |   6 +-
 .../cassandra/service/StorageService.java       |   4 +-
 .../cassandra/streaming/StreamLockfile.java     | 128 ---
 .../cassandra/streaming/StreamReader.java       |   4 +-
 .../cassandra/streaming/StreamReceiveTask.java  |  20 +-
 .../cassandra/streaming/StreamSession.java      |   7 +
 .../cassandra/tools/StandaloneLister.java       | 214 +++++
 .../cassandra/tools/StandaloneScrubber.java     |   5 +-
 .../cassandra/tools/StandaloneSplitter.java     |  10 +-
 .../cassandra/tools/StandaloneUpgrader.java     |  19 +-
 .../utils/concurrent/Transactional.java         |  25 +-
 .../la-1-big-CompressionInfo.db                 | Bin 0 -> 43 bytes
 .../la-1-big-Data.db                            | Bin 0 -> 93 bytes
 .../la-1-big-Digest.adler32                     |   1 +
 .../la-1-big-Filter.db                          | Bin 0 -> 16 bytes
 .../la-1-big-Index.db                           | Bin 0 -> 54 bytes
 .../la-1-big-Statistics.db                      | Bin 0 -> 4442 bytes
 .../la-1-big-Summary.db                         | Bin 0 -> 80 bytes
 .../la-1-big-TOC.txt                            |   8 +
 .../tmp-la-2-big-Data.db                        | Bin 0 -> 93 bytes
 .../tmp-la-2-big-Index.db                       | Bin 0 -> 54 bytes
 .../tmplink-la-2-big-Data.db                    | Bin 0 -> 93 bytes
 .../tmplink-la-2-big-Index.db                   | Bin 0 -> 54 bytes
 .../manifest.json                               |   1 +
 .../manifest.json                               |   1 +
 .../manifest.json                               |   1 +
 .../io/sstable/CQLSSTableWriterLongTest.java    |   5 +-
 test/unit/org/apache/cassandra/MockSchema.java  |  12 +-
 test/unit/org/apache/cassandra/Util.java        |   3 -
 .../org/apache/cassandra/cql3/CQLTester.java    |   2 +-
 .../cassandra/db/ColumnFamilyStoreTest.java     | 150 +---
 .../apache/cassandra/db/DirectoriesTest.java    |  24 +-
 .../unit/org/apache/cassandra/db/ScrubTest.java |   7 +-
 .../apache/cassandra/db/SystemKeyspaceTest.java |  48 ++
 .../db/compaction/AntiCompactionTest.java       |   4 +-
 .../compaction/CompactionAwareWriterTest.java   |   8 +-
 .../db/compaction/CompactionsTest.java          |  69 +-
 .../LeveledCompactionStrategyTest.java          |   5 +-
 .../cassandra/db/lifecycle/HelpersTest.java     |  25 +-
 .../db/lifecycle/LifecycleTransactionTest.java  |   9 +-
 .../db/lifecycle/RealTransactionsTest.java      | 228 ++++++
 .../cassandra/db/lifecycle/TrackerTest.java     |  40 +-
 .../db/lifecycle/TransactionLogsTest.java       | 558 +++++++++++++
 .../io/sstable/BigTableWriterTest.java          |  27 +-
 .../io/sstable/CQLSSTableWriterClientTest.java  |   9 +
 .../cassandra/io/sstable/DescriptorTest.java    |  18 +-
 .../cassandra/io/sstable/LegacySSTableTest.java |   2 +-
 .../cassandra/io/sstable/SSTableLoaderTest.java | 123 ++-
 .../io/sstable/SSTableRewriterTest.java         | 173 ++--
 .../cassandra/io/sstable/SSTableUtils.java      |  11 +-
 .../metadata/MetadataSerializerTest.java        |   2 +-
 .../org/apache/cassandra/schema/DefsTest.java   |   4 +-
 .../streaming/StreamTransferTaskTest.java       |   3 +-
 .../concurrent/AbstractTransactionalTest.java   |  31 +-
 101 files changed, 3265 insertions(+), 1357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 70b26f5..a0dadc3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,9 @@
 3.0
+<<<<<<< HEAD
  * Implement proper sandboxing for UDFs (CASSANDRA-9402)
+=======
+ * Simplify (and unify) cleanup of compaction leftovers (CASSANDRA-7066)
+>>>>>>> Introduce safer durable sstable membership management
  * Allow extra schema definitions in cassandra-stress yaml (CASSANDRA-9850)
  * Metrics should use up to date nomenclature (CASSANDRA-9448)
  * Change CREATE/ALTER TABLE syntax for compression (CASSANDRA-8384)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/bin/sstablelister
----------------------------------------------------------------------
diff --git a/bin/sstablelister b/bin/sstablelister
new file mode 100755
index 0000000..a79409d
--- /dev/null
+++ b/bin/sstablelister
@@ -0,0 +1,55 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ "x$CASSANDRA_INCLUDE" = "x" ]; then
+    for include in /usr/share/cassandra/cassandra.in.sh \
+                   /usr/local/share/cassandra/cassandra.in.sh \
+                   /opt/cassandra/cassandra.in.sh \
+                   ~/.cassandra.in.sh \
+                   "`dirname "$0"`/cassandra.in.sh"; do
+        if [ -r "$include" ]; then
+            . "$include"
+            break
+        fi
+    done
+elif [ -r "$CASSANDRA_INCLUDE" ]; then
+    . "$CASSANDRA_INCLUDE"
+fi
+
+# Use JAVA_HOME if set, otherwise look for java in PATH
+if [ -x "$JAVA_HOME/bin/java" ]; then
+    JAVA="$JAVA_HOME/bin/java"
+else
+    JAVA="`which java`"
+fi
+
+if [ -z "$CLASSPATH" ]; then
+    echo "You must set the CLASSPATH var" >&2
+    exit 1
+fi
+
+if [ "x$MAX_HEAP_SIZE" = "x" ]; then
+    MAX_HEAP_SIZE="256M"
+fi
+
+"$JAVA" $JAVA_AGENT -ea -cp "$CLASSPATH" $JVM_OPTS -Xmx$MAX_HEAP_SIZE \
+        -Dcassandra.storagedir="$cassandra_storagedir" \
+        -Dlogback.configurationFile=logback-tools.xml \
+        org.apache.cassandra.tools.StandaloneLister "$@"
+
+# vi:ai sw=4 ts=4 tw=0 et

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/bin/sstablelister.bat
----------------------------------------------------------------------
diff --git a/bin/sstablelister.bat b/bin/sstablelister.bat
new file mode 100644
index 0000000..cb50a08
--- /dev/null
+++ b/bin/sstablelister.bat
@@ -0,0 +1,41 @@
+@REM
+@REM  Licensed to the Apache Software Foundation (ASF) under one or more
+@REM  contributor license agreements.  See the NOTICE file distributed with
+@REM  this work for additional information regarding copyright ownership.
+@REM  The ASF licenses this file to You under the Apache License, Version 2.0
+@REM  (the "License"); you may not use this file except in compliance with
+@REM  the License.  You may obtain a copy of the License at
+@REM
+@REM      http://www.apache.org/licenses/LICENSE-2.0
+@REM
+@REM  Unless required by applicable law or agreed to in writing, software
+@REM  distributed under the License is distributed on an "AS IS" BASIS,
+@REM  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+@REM  See the License for the specific language governing permissions and
+@REM  limitations under the License.
+
+@echo off
+if "%OS%" == "Windows_NT" setlocal
+
+pushd "%~dp0"
+call cassandra.in.bat
+
+if NOT DEFINED CASSANDRA_MAIN set CASSANDRA_MAIN=org.apache.cassandra.tools.StandaloneLister
+if NOT DEFINED JAVA_HOME goto :err
+
+REM ***** JAVA options *****
+set JAVA_OPTS=^
+ -Dlogback.configurationFile=logback-tools.xml
+
+set TOOLS_PARAMS=
+
+"%JAVA_HOME%\bin\java" %JAVA_OPTS% %CASSANDRA_PARAMS% -cp %CASSANDRA_CLASSPATH% "%CASSANDRA_MAIN%" %*
+goto finally
+
+:err
+echo JAVA_HOME environment variable must be set!
+pause
+
+:finally
+
+ENDLOCAL

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 5a82d64..e6581ea 100644
--- a/build.xml
+++ b/build.xml
@@ -1214,7 +1214,7 @@
         <jvmarg value="-Xss256k"/>
         <jvmarg value="-Dcassandra.memtable_row_overhead_computation_step=100"/>
         <jvmarg value="-Dcassandra.test.use_prepared=${cassandra.test.use_prepared}"/>
-	    <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>
+        <jvmarg value="-Dcassandra.test.offsetseed=@{poffset}"/>
         <jvmarg value="-Dcassandra.test.sstableformatdevelopment=true"/>
         <jvmarg value="-Dcassandra.testtag=@{testtag}"/> 
         <jvmarg value="-Dcassandra.keepBriefBrief=${cassandra.keepBriefBrief}" />
@@ -1882,7 +1882,7 @@
           <option name="MAIN_CLASS_NAME" value="" />
           <option name="METHOD_NAME" value="" />
           <option name="TEST_OBJECT" value="class" />
-          <option name="VM_PARAMETERS" value="-Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -ea" />
+          <option name="VM_PARAMETERS" value="-Dcassandra.debugrefcount=true -Dcassandra.config=file://$PROJECT_DIR$/test/conf/cassandra.yaml -Dlogback.configurationFile=file://$PROJECT_DIR$/test/conf/logback-test.xml -Dcassandra.logdir=$PROJECT_DIR$/build/test/logs -Dcorrupt-sstable-root=$PROJECT_DIR$/test/data/corrupt-sstables -Dmigration-sstable-root=${test.data}/migration-sstables -ea" />
           <option name="PARAMETERS" value="" />
           <option name="WORKING_DIRECTORY" value="" />
           <option name="ENV_VARIABLES" />

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index 2a2062e..fe6752f 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -154,7 +154,7 @@ public class Config
     public volatile Integer stream_throughput_outbound_megabits_per_sec = 200;
     public volatile Integer inter_dc_stream_throughput_outbound_megabits_per_sec = 0;
 
-    public String[] data_file_directories;
+    public String[] data_file_directories = new String[0];
 
     public String saved_caches_directory;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 01b1633..a25af65 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -490,7 +490,7 @@ public class DatabaseDescriptor
                 throw new ConfigurationException("saved_caches_directory is missing and -Dcassandra.storagedir is not set", false);
             conf.saved_caches_directory += File.separator + "saved_caches";
         }
-        if (conf.data_file_directories == null)
+        if (conf.data_file_directories == null || conf.data_file_directories.length == 0)
         {
             String defaultDataDir = System.getProperty("cassandra.storagedir", null);
             if (defaultDataDir == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index d3ad4e6..8d14120 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -60,18 +60,14 @@ import org.apache.cassandra.db.partitions.*;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.FSReadError;
 import org.apache.cassandra.io.compress.CompressionParameters;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.io.sstable.format.*;
-import org.apache.cassandra.io.sstable.metadata.CompactionMetadata;
-import org.apache.cassandra.io.sstable.metadata.MetadataType;
 import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.metrics.TableMetrics.Sampler;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.StreamLockfile;
 import org.apache.cassandra.utils.*;
 import org.apache.cassandra.utils.concurrent.*;
 import org.apache.cassandra.utils.TopKSampler.SamplerResult;
@@ -522,45 +518,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
     {
         Directories directories = new Directories(metadata);
 
-        // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
+         // clear ephemeral snapshots that were not properly cleared last session (CASSANDRA-7357)
         clearEphemeralSnapshots(directories);
 
-        // remove any left-behind SSTables from failed/stalled streaming
-        FileFilter filter = new FileFilter()
-        {
-            public boolean accept(File pathname)
-            {
-                return pathname.getPath().endsWith(StreamLockfile.FILE_EXT);
-            }
-        };
-        for (File dir : directories.getCFDirectories())
-        {
-            File[] lockfiles = dir.listFiles(filter);
-            // lock files can be null if I/O error happens
-            if (lockfiles == null || lockfiles.length == 0)
-                continue;
-            logger.info("Removing SSTables from failed streaming session. Found {} files to cleanup.", lockfiles.length);
-
-            for (File lockfile : lockfiles)
-            {
-                StreamLockfile streamLockfile = new StreamLockfile(lockfile);
-                streamLockfile.cleanup();
-                streamLockfile.delete();
-            }
-        }
-
-        logger.debug("Removing compacted SSTable files from {} (see http://wiki.apache.org/cassandra/MemtableSSTable)", metadata.cfName);
+        logger.debug("Removing temporary or obsoleted files from unfinished operations for table", metadata.cfName);
+        LifecycleTransaction.removeUnfinishedLeftovers(metadata);
 
+        logger.debug("Further extra check for orphan sstable files for {}", metadata.cfName);
         for (Map.Entry<Descriptor,Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
             Set<Component> components = sstableFiles.getValue();
 
-            if (desc.type.isTemporary)
-            {
-                SSTable.delete(desc, components);
-                continue;
-            }
+            for (File tmpFile : desc.getTemporaryFiles())
+                tmpFile.delete();
 
             File dataFile = new File(desc.filenameFor(Component.DATA));
             if (components.contains(Component.DATA) && dataFile.length() > 0)
@@ -571,7 +542,9 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
             logger.warn("Removing orphans for {}: {}", desc, components);
             for (Component component : components)
             {
-                FileUtils.deleteWithConfirm(desc.filenameFor(component));
+                File file = new File(desc.filenameFor(component));
+                if (file.exists())
+                    FileUtils.deleteWithConfirm(desc.filenameFor(component));
             }
         }
 
@@ -600,91 +573,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         }
     }
 
-    /**
-     * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the
-     * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then
-     * their ancestors are removed.
-     *
-     * If an unclean shutdown happens at the right time, we can thus end up with both the new ones and their
-     * ancestors "live" in the system.  This is harmless for normal data, but for counters it can cause overcounts.
-     *
-     * To prevent this, we record sstables being compacted in the system keyspace.  If we find unfinished
-     * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
-     * sstables from any given ancestor).
-     */
-    public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map<Integer, UUID> unfinishedCompactions)
-    {
-        Directories directories = new Directories(metadata);
-
-        Set<Integer> allGenerations = new HashSet<>();
-        for (Descriptor desc : directories.sstableLister().list().keySet())
-            allGenerations.add(desc.generation);
-
-        // sanity-check unfinishedCompactions
-        Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
-        if (!allGenerations.containsAll(unfinishedGenerations))
-        {
-            HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
-            missingGenerations.removeAll(allGenerations);
-            logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
-                         metadata.ksName, metadata.cfName, missingGenerations);
-        }
-
-        // remove new sstables from compactions that didn't complete, and compute
-        // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<>();
-        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().skipTemporary(true).list().entrySet())
-        {
-            Descriptor desc = sstableFiles.getKey();
-
-            Set<Integer> ancestors;
-            try
-            {
-                CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION);
-                ancestors = compactionMetadata.ancestors;
-            }
-            catch (IOException e)
-            {
-                throw new FSReadError(e, desc.filenameFor(Component.STATS));
-            }
-            catch (NullPointerException e)
-            {
-                throw new FSReadError(e, "Failed to remove unfinished compaction leftovers (file: " + desc.filenameFor(Component.STATS) + ").  See log for details.");
-            }
-
-            if (!ancestors.isEmpty()
-                && unfinishedGenerations.containsAll(ancestors)
-                && allGenerations.containsAll(ancestors))
-            {
-                // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
-                UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
-                assert compactionTaskID != null;
-                logger.debug("Going to delete unfinished compaction product {}", desc);
-                SSTable.delete(desc, sstableFiles.getValue());
-                SystemKeyspace.finishCompaction(compactionTaskID);
-            }
-            else
-            {
-                completedAncestors.addAll(ancestors);
-            }
-        }
-
-        // remove old sstables from compactions that did complete
-        for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
-        {
-            Descriptor desc = sstableFiles.getKey();
-            if (completedAncestors.contains(desc.generation))
-            {
-                // if any of the ancestors were participating in a compaction, finish that compaction
-                logger.debug("Going to delete leftover compaction ancestor {}", desc);
-                SSTable.delete(desc, sstableFiles.getValue());
-                UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
-                if (compactionTaskID != null)
-                    SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
-            }
-        }
-    }
-
     // must be called after all sstables are loaded since row cache merges all row versions
     public void initRowCache()
     {
@@ -750,8 +638,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
 
             if (currentDescriptors.contains(descriptor))
                 continue; // old (initialized) SSTable found, skipping
-            if (descriptor.type.isTemporary) // in the process of being written
-                continue;
 
             if (!descriptor.isCompatible())
                 throw new RuntimeException(String.format("Can't open incompatible SSTable! Current version %s, found file: %s",
@@ -780,7 +666,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                                                descriptor.ksname,
                                                descriptor.cfname,
                                                fileIndexGenerator.incrementAndGet(),
-                                               Descriptor.Type.FINAL,
                                                descriptor.formatType);
             }
             while (new File(newDescriptor.filenameFor(Component.DATA)).exists());
@@ -851,24 +736,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return name;
     }
 
-    public String getTempSSTablePath(File directory)
+    public String getSSTablePath(File directory)
     {
-        return getTempSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat());
+        return getSSTablePath(directory, DatabaseDescriptor.getSSTableFormat().info.getLatestVersion(), DatabaseDescriptor.getSSTableFormat());
     }
 
-    public String getTempSSTablePath(File directory, SSTableFormat.Type format)
+    public String getSSTablePath(File directory, SSTableFormat.Type format)
     {
-        return getTempSSTablePath(directory, format.info.getLatestVersion(), format);
+        return getSSTablePath(directory, format.info.getLatestVersion(), format);
     }
 
-    private String getTempSSTablePath(File directory, Version version, SSTableFormat.Type format)
+    private String getSSTablePath(File directory, Version version, SSTableFormat.Type format)
     {
         Descriptor desc = new Descriptor(version,
                                          directory,
                                          keyspace.getName(),
                                          name,
                                          fileIndexGenerator.incrementAndGet(),
-                                         Descriptor.Type.TEMP,
                                          format);
         return desc.filenameFor(Component.DATA);
     }
@@ -1883,11 +1767,6 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         return directories.getSnapshotDetails();
     }
 
-    public boolean hasUnreclaimedSpace()
-    {
-        return metric.liveDiskSpaceUsed.getCount() < metric.totalDiskSpaceUsed.getCount();
-    }
-
     /**
      * @return the cached partition for @param key if it is already present in the cache.
      * Not that this will not readAndCache the parition if it is not present, nor

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java b/src/java/org/apache/cassandra/db/Directories.java
index 8b61c68..bede4c4 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -44,6 +44,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.*;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.FSWriteError;
 import org.apache.cassandra.io.util.FileUtils;
@@ -91,6 +92,7 @@ public class Directories
 
     public static final String BACKUPS_SUBDIR = "backups";
     public static final String SNAPSHOT_SUBDIR = "snapshots";
+    public static final String TRANSACTIONS_SUBDIR = "transactions";
     public static final String SECONDARY_INDEX_NAME_SEPARATOR = ".";
 
     public static final DataDirectory[] dataDirectories;
@@ -466,6 +468,35 @@ public class Directories
         }
     }
 
+    public static File getTransactionsDirectory(File folder)
+    {
+        return getOrCreate(folder, TRANSACTIONS_SUBDIR);
+    }
+
+    public List<File> getExistingDirectories(String subFolder)
+    {
+        List<File> ret = new ArrayList<>();
+        for (File dir : dataPaths)
+        {
+            File subDir = getExistingDirectory(dir, subFolder);
+            if (subDir != null)
+                ret.add(subDir);
+
+        }
+        return ret;
+    }
+
+    public static File getExistingDirectory(File folder, String subFolder)
+    {
+        File subDir = new File(folder, join(subFolder));
+        if (subDir.exists())
+        {
+            assert(subDir.isDirectory());
+            return subDir;
+        }
+        return null;
+    }
+
     public SSTableLister sstableLister()
     {
         return new SSTableLister();
@@ -521,6 +552,7 @@ public class Directories
     public class SSTableLister
     {
         private boolean skipTemporary;
+        private boolean onlyTemporary;
         private boolean includeBackups;
         private boolean onlyBackups;
         private int nbFiles;
@@ -536,6 +568,14 @@ public class Directories
             return this;
         }
 
+        public SSTableLister onlyTemporary(boolean b)
+        {
+            if (filtered)
+                throw new IllegalStateException("list() has already been called");
+            onlyTemporary = b;
+            return this;
+        }
+
         public SSTableLister includeBackups(boolean b)
         {
             if (filtered)
@@ -593,21 +633,25 @@ public class Directories
 
                 if (snapshotName != null)
                 {
-                    getSnapshotDirectory(location, snapshotName).listFiles(getFilter());
+                    getSnapshotDirectory(location, snapshotName).listFiles(getFilter(location));
                     continue;
                 }
 
                 if (!onlyBackups)
-                    location.listFiles(getFilter());
+                    location.listFiles(getFilter(location));
 
                 if (includeBackups)
-                    getBackupsDirectory(location).listFiles(getFilter());
+                    getBackupsDirectory(location).listFiles(getFilter(location));
             }
             filtered = true;
         }
 
-        private FileFilter getFilter()
+        private FileFilter getFilter(File location)
         {
+           final Set<File> temporaryFiles = skipTemporary || onlyTemporary
+                                            ? LifecycleTransaction.getTemporaryFiles(metadata, location)
+                                            : Collections.<File>emptySet();
+
             return new FileFilter()
             {
                 // This function always return false since accepts adds to the components map
@@ -624,7 +668,10 @@ public class Directories
                     if (!pair.left.ksname.equals(metadata.ksName) || !pair.left.cfname.equals(metadata.cfName))
                         return false;
 
-                    if (skipTemporary && pair.left.type.isTemporary)
+                    if (skipTemporary && temporaryFiles.contains(file))
+                        return false;
+
+                    if (onlyTemporary && !temporaryFiles.contains(file))
                         return false;
 
                     Set<Component> previous = components.get(pair.left);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/Memtable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java
index 71e03d5..ecaf063 100644
--- a/src/java/org/apache/cassandra/db/Memtable.java
+++ b/src/java/org/apache/cassandra/db/Memtable.java
@@ -28,6 +28,9 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.SSTableTxnWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -367,8 +370,7 @@ public class Memtable implements Comparable<Memtable>
             logger.info("Writing {}", Memtable.this.toString());
 
             SSTableReader ssTable;
-            // errors when creating the writer that may leave empty temp files.
-            try (SSTableWriter writer = createFlushWriter(cfs.getTempSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
+            try (SSTableTxnWriter writer = createFlushWriter(cfs.getSSTablePath(sstableDirectory), columnsCollector.get(), statsCollector.get()))
             {
                 boolean trackContention = logger.isDebugEnabled();
                 int heavilyContendedRowCount = 0;
@@ -400,10 +402,10 @@ public class Memtable implements Comparable<Memtable>
                 {
                     logger.info(String.format("Completed flushing %s (%s) for commitlog position %s",
                                               writer.getFilename(),
-                                              FBUtilities.prettyPrintMemory(writer.getOnDiskFilePointer()),
+                                              FBUtilities.prettyPrintMemory(writer.getFilePointer()),
                                               context));
 
-                    // temp sstables should contain non-repaired data.
+                    // sstables should contain non-repaired data.
                     ssTable = writer.finish(true);
                 }
                 else
@@ -421,18 +423,23 @@ public class Memtable implements Comparable<Memtable>
             }
         }
 
-        public SSTableWriter createFlushWriter(String filename,
+        public SSTableTxnWriter createFlushWriter(String filename,
                                                PartitionColumns columns,
                                                EncodingStats stats)
         {
+            // we operate "offline" here, as we expose the resulting reader consciously when done
+            // (although we may want to modify this behaviour in future, to encapsulate full flush behaviour in LifecycleTransaction)
+            LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.FLUSH, cfs.metadata);
             MetadataCollector sstableMetadataCollector = new MetadataCollector(cfs.metadata.comparator).replayPosition(context);
-            return SSTableWriter.create(Descriptor.fromFilename(filename),
-                                        (long)partitions.size(),
-                                        ActiveRepairService.UNREPAIRED_SSTABLE,
-                                        cfs.metadata,
-                                        cfs.partitioner,
-                                        sstableMetadataCollector,
-                                        new SerializationHeader(cfs.metadata, columns, stats));
+            return new SSTableTxnWriter(txn,
+                                        SSTableWriter.create(Descriptor.fromFilename(filename),
+                                                             (long)partitions.size(),
+                                                             ActiveRepairService.UNREPAIRED_SSTABLE,
+                                                             cfs.metadata,
+                                                             cfs.partitioner,
+                                                             sstableMetadataCollector,
+                                                             new SerializationHeader(cfs.metadata, columns, stats),
+                                                             txn));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 0957af6..6a4a847 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 
-import com.google.common.base.Function;
 import com.google.common.collect.*;
 import com.google.common.io.ByteStreams;
 
@@ -47,9 +46,10 @@ import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
-import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.io.util.NIODataInputStream;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.metrics.RestorableMeter;
@@ -97,7 +97,6 @@ public final class SystemKeyspace
     public static final String PEERS = "peers";
     public static final String PEER_EVENTS = "peer_events";
     public static final String RANGE_XFERS = "range_xfers";
-    public static final String COMPACTIONS_IN_PROGRESS = "compactions_in_progress";
     public static final String COMPACTION_HISTORY = "compaction_history";
     public static final String SSTABLE_ACTIVITY = "sstable_activity";
     public static final String SIZE_ESTIMATES = "size_estimates";
@@ -216,16 +215,6 @@ public final class SystemKeyspace
                 + "requested_at timestamp,"
                 + "PRIMARY KEY ((token_bytes)))");
 
-    private static final CFMetaData CompactionsInProgress =
-        compile(COMPACTIONS_IN_PROGRESS,
-                "unfinished compactions",
-                "CREATE TABLE %s ("
-                + "id uuid,"
-                + "columnfamily_name text,"
-                + "inputs set<int>,"
-                + "keyspace_name text,"
-                + "PRIMARY KEY ((id)))");
-
     private static final CFMetaData CompactionHistory =
         compile(COMPACTION_HISTORY,
                 "week-long compaction history",
@@ -408,7 +397,6 @@ public final class SystemKeyspace
                          Peers,
                          PeerEvents,
                          RangeXfers,
-                         CompactionsInProgress,
                          CompactionHistory,
                          SSTableActivity,
                          SizeEstimates,
@@ -485,81 +473,6 @@ public final class SystemKeyspace
                             FBUtilities.getLocalAddress());
     }
 
-    /**
-     * Write compaction log, except columfamilies under system keyspace.
-     *
-     * @param cfs cfs to compact
-     * @param toCompact sstables to compact
-     * @return compaction task id or null if cfs is under system keyspace
-     */
-    public static UUID startCompaction(ColumnFamilyStore cfs, Iterable<SSTableReader> toCompact)
-    {
-        if (Schema.isSystemKeyspace(cfs.keyspace.getName()))
-            return null;
-
-        UUID compactionId = UUIDGen.getTimeUUID();
-        Iterable<Integer> generations = Iterables.transform(toCompact, new Function<SSTableReader, Integer>()
-        {
-            public Integer apply(SSTableReader sstable)
-            {
-                return sstable.descriptor.generation;
-            }
-        });
-        String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
-        executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
-        forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
-        return compactionId;
-    }
-
-    /**
-     * Deletes the entry for this compaction from the set of compactions in progress.  The compaction does not need
-     * to complete successfully for this to be called.
-     * @param taskId what was returned from {@code startCompaction}
-     */
-    public static void finishCompaction(UUID taskId)
-    {
-        assert taskId != null;
-
-        executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId);
-        forceBlockingFlush(COMPACTIONS_IN_PROGRESS);
-    }
-
-    /**
-     * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
-     * task ID of the compaction they were participating in.
-     */
-    public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
-    {
-        String req = "SELECT * FROM system.%s";
-        UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS));
-
-        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
-        for (UntypedResultSet.Row row : resultSet)
-        {
-            String keyspace = row.getString("keyspace_name");
-            String columnfamily = row.getString("columnfamily_name");
-            Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
-            UUID taskID = row.getUUID("id");
-
-            Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
-            Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
-            if (generationToTaskID == null)
-                generationToTaskID = new HashMap<>(inputs.size());
-
-            for (Integer generation : inputs)
-                generationToTaskID.put(generation, taskID);
-
-            unfinishedCompactions.put(kscf, generationToTaskID);
-        }
-        return unfinishedCompactions;
-    }
-
-    public static void discardCompactionsInProgress()
-    {
-        ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS);
-        compactionLog.truncateBlocking();
-    }
-
     public static void updateCompactionHistory(String ksname,
                                                String cfname,
                                                long compactedAt,
@@ -1227,7 +1140,7 @@ public final class SystemKeyspace
      *
      * @throws IOException
      */
-    public static void snapshotOnVersionChange() throws IOException
+    public static boolean snapshotOnVersionChange() throws IOException
     {
         String previous = getPreviousVersionString();
         String next = FBUtilities.getReleaseVersionString();
@@ -1242,7 +1155,10 @@ public final class SystemKeyspace
                                                                                     next));
             Keyspace systemKs = Keyspace.open(SystemKeyspace.NAME);
             systemKs.snapshot(snapshotName, null);
+            return true;
         }
+
+        return false;
     }
 
     /**
@@ -1282,6 +1198,36 @@ public final class SystemKeyspace
         return result.one().getString("release_version");
     }
 
+    /**
+     * Check data directories for old files that can be removed when migrating from 2.2 to 3.0,
+     * these checks can be removed in 4.0, see CASSANDRA-7066
+     */
+    public static void migrateDataDirs()
+    {
+        Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations());
+        for (String dataDir : dirs)
+        {
+            logger.debug("Checking directory {} for old files", dataDir);
+            File dir = new File(dataDir);
+            assert dir.exists() : dir + " should have been created by startup checks";
+
+            for (File ksdir : dir.listFiles((d, n) -> d.isDirectory()))
+            {
+                for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory()))
+                {
+                    if (Descriptor.isLegacyFile(cfdir.getName()))
+                    {
+                        FileUtils.deleteRecursive(cfdir);
+                    }
+                    else
+                    {
+                        FileUtils.delete(cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(n)));
+                    }
+                }
+            }
+        }
+    }
+
     private static ByteBuffer rangeToBytes(Range<Token> range)
     {
         try (DataOutputBuffer out = new DataOutputBuffer())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index 303de15..df3bc4e 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -63,7 +63,7 @@ public class CompactionController implements AutoCloseable
         refreshOverlaps();
     }
 
-    void maybeRefreshOverlaps()
+    public void maybeRefreshOverlaps()
     {
         for (SSTableReader reader : overlappingSSTables)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 6cf2e18..616c310 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -823,7 +823,7 @@ public class CompactionManager implements CompactionManagerMBean
              CompactionController controller = new CompactionController(cfs, txn.originals(), getDefaultGcBefore(cfs, nowInSec));
              CompactionIterator ci = new CompactionIterator(OperationType.CLEANUP, Collections.singletonList(scanner), controller, nowInSec, UUIDGen.getTimeUUID(), metrics))
         {
-            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+            writer.switchWriter(createWriter(cfs, compactionFileLocation, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, txn));
 
             while (ci.hasNext())
             {
@@ -948,24 +948,27 @@ public class CompactionManager implements CompactionManagerMBean
                                              File compactionFileLocation,
                                              int expectedBloomFilterSize,
                                              long repairedAt,
-                                             SSTableReader sstable)
+                                             SSTableReader sstable,
+                                             LifecycleTransaction txn)
     {
         FileUtils.createDirectory(compactionFileLocation);
 
         return SSTableWriter.create(cfs.metadata,
-                                    Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
+                                    Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)),
                                     expectedBloomFilterSize,
                                     repairedAt,
                                     sstable.getSSTableLevel(),
                                     cfs.partitioner,
-                                    sstable.header);
+                                    sstable.header,
+                                    txn);
     }
 
     public static SSTableWriter createWriterForAntiCompaction(ColumnFamilyStore cfs,
                                                               File compactionFileLocation,
                                                               int expectedBloomFilterSize,
                                                               long repairedAt,
-                                                              Collection<SSTableReader> sstables)
+                                                              Collection<SSTableReader> sstables,
+                                                              LifecycleTransaction txn)
     {
         FileUtils.createDirectory(compactionFileLocation);
         int minLevel = Integer.MAX_VALUE;
@@ -983,13 +986,14 @@ public class CompactionManager implements CompactionManagerMBean
                 break;
             }
         }
-        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(compactionFileLocation)),
+        return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(compactionFileLocation)),
                                     (long) expectedBloomFilterSize,
                                     repairedAt,
                                     cfs.metadata,
                                     cfs.partitioner,
                                     new MetadataCollector(sstables, cfs.metadata.comparator, minLevel),
-                                    SerializationHeader.make(cfs.metadata, sstables));
+                                    SerializationHeader.make(cfs.metadata, sstables),
+                                    txn);
     }
 
 
@@ -1198,8 +1202,8 @@ public class CompactionManager implements CompactionManagerMBean
         {
             int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));
 
-            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
-            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet));
+            repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, anticompactionGroup));
+            unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, anticompactionGroup));
 
             while (ci.hasNext())
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 6335834..7897a1a 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.compaction;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -46,7 +45,6 @@ import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.UUIDGen;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 public class CompactionTask extends AbstractCompactionTask
@@ -128,7 +126,7 @@ public class CompactionTask extends AbstractCompactionTask
             }
         });
 
-        UUID taskId = SystemKeyspace.startCompaction(cfs, transaction.originals());
+        UUID taskId = transaction.opId();
 
         // new sstables from flush can be added during a compaction, but only the compaction can remove them,
         // so in our single-threaded compaction world this is a valid way of determining if we're compacting
@@ -139,8 +137,8 @@ public class CompactionTask extends AbstractCompactionTask
             ssTableLoggerMsg.append(String.format("%s:level=%d, ", sstr.getFilename(), sstr.getSSTableLevel()));
         }
         ssTableLoggerMsg.append("]");
-        String taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString();
-        logger.info("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg);
+
+        logger.info("Compacting ({}) {}", taskId, ssTableLoggerMsg);
 
         long start = System.nanoTime();
         long totalKeysWritten = 0;
@@ -186,16 +184,11 @@ public class CompactionTask extends AbstractCompactionTask
                         }
                     }
 
-                    // don't replace old sstables yet, as we need to mark the compaction finished in the system table
+                    // point of no return
                     newSStables = writer.finish();
                 }
                 finally
                 {
-                    // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones
-                    // (in replaceCompactedSSTables)
-                    if (taskId != null)
-                        SystemKeyspace.finishCompaction(taskId);
-
                     if (collector != null)
                         collector.finishCompaction(ci);
 
@@ -217,7 +210,7 @@ public class CompactionTask extends AbstractCompactionTask
             long totalSourceRows = 0;
             String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), mergedRowCounts, startsize, endsize);
             logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}",
-                                      taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
+                                      taskId, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
             logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize)));
             logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
 
@@ -227,10 +220,11 @@ public class CompactionTask extends AbstractCompactionTask
     }
 
     @Override
-    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables)
+    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          LifecycleTransaction transaction,
+                                                          Set<SSTableReader> nonExpiredSSTables)
     {
-        return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline, compactionType);
-
+        return new DefaultCompactionWriter(cfs, transaction, nonExpiredSSTables, offline);
     }
 
     public static String updateCompactionHistory(String keyspaceName, String columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
index 1c3b686..d3d56ac 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionTask.java
@@ -41,11 +41,13 @@ public class LeveledCompactionTask extends CompactionTask
     }
 
     @Override
-    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+    public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                          LifecycleTransaction txn,
+                                                          Set<SSTableReader> nonExpiredSSTables)
     {
         if (majorCompaction)
-            return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false, compactionType);
-        return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false, compactionType);
+            return new MajorLeveledCompactionWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, false);
+        return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, maxSSTableBytes, getLevel(), false);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/OperationType.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/OperationType.java b/src/java/org/apache/cassandra/db/compaction/OperationType.java
index a14f13f..5b6ce05 100644
--- a/src/java/org/apache/cassandra/db/compaction/OperationType.java
+++ b/src/java/org/apache/cassandra/db/compaction/OperationType.java
@@ -32,13 +32,27 @@ public enum OperationType
     TOMBSTONE_COMPACTION("Tombstone Compaction"),
     UNKNOWN("Unknown compaction type"),
     ANTICOMPACTION("Anticompaction after repair"),
-    VERIFY("Verify");
+    VERIFY("Verify"),
+    FLUSH("Flush"),
+    STREAM("Stream"),
+    WRITE("Write");
 
-    private final String type;
+    public final String type;
+    public final String fileName;
 
     OperationType(String type)
     {
         this.type = type;
+        this.fileName = type.toLowerCase().replace(" ", "");
+    }
+
+    public static OperationType fromFileName(String fileName)
+    {
+        for (OperationType opType : OperationType.values())
+            if (opType.fileName.equals(fileName))
+                return opType;
+
+        throw new IllegalArgumentException("Invalid fileName for operation type: " + fileName);
     }
 
     public String toString()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
index e9a4f05..8f382ea 100644
--- a/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
+++ b/src/java/org/apache/cassandra/db/compaction/SSTableSplitter.java
@@ -74,9 +74,11 @@ public class SSTableSplitter {
         }
 
         @Override
-        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              LifecycleTransaction txn,
+                                                              Set<SSTableReader> nonExpiredSSTables)
         {
-            return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true, compactionType);
+            return new MaxSSTableSizeWriter(cfs, txn, nonExpiredSSTables, sstableSizeInMB * 1024L * 1024L, 0, true);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 94f3af7..c853157 100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@ -59,6 +59,7 @@ public class Scrubber implements Closeable
     private final RowIndexEntry.IndexSerializer rowIndexEntrySerializer;
 
     private final boolean isOffline;
+    private final boolean keepOriginals;
 
     private SSTableReader newSstable;
     private SSTableReader newInOrderSstable;
@@ -85,11 +86,17 @@ public class Scrubber implements Closeable
 
     public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, boolean isOffline, boolean checkData) throws IOException
     {
-        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData);
+        this(cfs, transaction, skipCorrupted, new OutputHandler.LogOutput(), isOffline, checkData, false);
     }
 
     @SuppressWarnings("resource")
-    public Scrubber(ColumnFamilyStore cfs, LifecycleTransaction transaction, boolean skipCorrupted, OutputHandler outputHandler, boolean isOffline, boolean checkData) throws IOException
+    public Scrubber(ColumnFamilyStore cfs,
+                    LifecycleTransaction transaction,
+                    boolean skipCorrupted,
+                    OutputHandler outputHandler,
+                    boolean isOffline,
+                    boolean checkData,
+                    boolean keepOriginals) throws IOException
     {
         this.cfs = cfs;
         this.transaction = transaction;
@@ -97,6 +104,7 @@ public class Scrubber implements Closeable
         this.outputHandler = outputHandler;
         this.skipCorrupted = skipCorrupted;
         this.isOffline = isOffline;
+        this.keepOriginals = keepOriginals;
         this.rowIndexEntrySerializer = sstable.descriptor.version.getSSTableFormat().getIndexSerializer(sstable.metadata,
                                                                                                         sstable.descriptor.version,
                                                                                                         sstable.header);
@@ -149,7 +157,7 @@ public class Scrubber implements Closeable
     {
         outputHandler.output(String.format("Scrubbing %s (%s bytes)", sstable, dataFile.length()));
         int nowInSec = FBUtilities.nowInSeconds();
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline))
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, sstable.maxDataAge, isOffline).keepOriginals(isOffline))
         {
             nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
             if (indexAvailable())
@@ -159,7 +167,7 @@ public class Scrubber implements Closeable
                 assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
             }
 
-            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable));
+            writer.switchWriter(CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, sstable.getSSTableMetadata().repairedAt, sstable, transaction));
 
             DecoratedKey prevKey = null;
 
@@ -291,7 +299,7 @@ public class Scrubber implements Closeable
             {
                 // out of order rows, but no bad rows found - we can keep our repairedAt time
                 long repairedAt = badRows > 0 ? ActiveRepairService.UNREPAIRED_SSTABLE : sstable.getSSTableMetadata().repairedAt;
-                try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable);)
+                try (SSTableWriter inOrderWriter = CompactionManager.createWriter(cfs, destination, expectedBloomFilterSize, repairedAt, sstable, transaction))
                 {
                     for (Partition partition : outOfOrder)
                         inOrderWriter.append(partition.unfilteredIterator());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
index 74a9757..0ece341 100644
--- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java
@@ -347,9 +347,11 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy
         }
 
         @Override
-        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables)
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              LifecycleTransaction txn,
+                                                              Set<SSTableReader> nonExpiredSSTables)
         {
-            return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables, compactionType);
+            return new SplittingSizeTieredCompactionWriter(cfs, txn, nonExpiredSSTables);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
index e3764c8..be0dd2a 100644
--- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java
+++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java
@@ -43,7 +43,6 @@ public class Upgrader
     private final LifecycleTransaction transaction;
     private final File directory;
 
-    private final OperationType compactionType = OperationType.UPGRADE_SSTABLES;
     private final CompactionController controller;
     private final CompactionStrategyManager strategyManager;
     private final long estimatedRows;
@@ -80,23 +79,23 @@ public class Upgrader
                 sstableMetadataCollector.addAncestor(i);
         }
         sstableMetadataCollector.sstableLevel(sstable.getSSTableLevel());
-        return SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(directory)),
+        return SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(directory)),
                                     estimatedRows,
                                     repairedAt,
                                     cfs.metadata,
                                     cfs.partitioner,
                                     sstableMetadataCollector,
-                                    SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)));
+                                    SerializationHeader.make(cfs.metadata, Sets.newHashSet(sstable)),
+                                    transaction);
     }
 
-    public void upgrade()
+    public void upgrade(boolean keepOriginals)
     {
         outputHandler.output("Upgrading " + sstable);
-
         int nowInSec = FBUtilities.nowInSeconds();
-        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true);
+        try (SSTableRewriter writer = new SSTableRewriter(cfs, transaction, CompactionTask.getMaxDataAge(transaction.originals()), true).keepOriginals(keepOriginals);
              AbstractCompactionStrategy.ScannerList scanners = strategyManager.getScanners(transaction.originals());
-             CompactionIterator iter = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
+             CompactionIterator iter = new CompactionIterator(transaction.opType(), scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID()))
         {
             writer.switchWriter(createCompactionWriter(sstable.getSSTableMetadata().repairedAt));
             while (iter.hasNext())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
index 610592f..f8c73d3 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/CompactionAwareWriter.java
@@ -42,16 +42,22 @@ public abstract class CompactionAwareWriter extends Transactional.AbstractTransa
     protected final long estimatedTotalKeys;
     protected final long maxAge;
     protected final long minRepairedAt;
+
+    protected final LifecycleTransaction txn;
     protected final SSTableRewriter sstableWriter;
 
-    public CompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline)
+    public CompactionAwareWriter(ColumnFamilyStore cfs,
+                                 LifecycleTransaction txn,
+                                 Set<SSTableReader> nonExpiredSSTables,
+                                 boolean offline)
     {
         this.cfs = cfs;
         this.nonExpiredSSTables = nonExpiredSSTables;
         this.estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         this.maxAge = CompactionTask.getMaxDataAge(nonExpiredSSTables);
         this.minRepairedAt = CompactionTask.getMinRepairedAt(nonExpiredSSTables);
-        this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline);
+        this.txn = txn;
+        this.sstableWriter = new SSTableRewriter(cfs, txn, maxAge, offline).keepOriginals(offline);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
index 8fc7bec..cdacddc 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/DefaultCompactionWriter.java
@@ -27,14 +27,12 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.SSTableWriter;
 import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
 
-
 /**
  * The default compaction writer - creates one output file in L0
  */
@@ -43,20 +41,21 @@ public class DefaultCompactionWriter extends CompactionAwareWriter
     protected static final Logger logger = LoggerFactory.getLogger(DefaultCompactionWriter.class);
 
     @SuppressWarnings("resource")
-    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline, OperationType compactionType)
+    public DefaultCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, boolean offline)
     {
         super(cfs, txn, nonExpiredSSTables, offline);
         logger.debug("Expected bloom filter size : {}", estimatedTotalKeys);
-        long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+        long expectedWriteSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                     estimatedTotalKeys,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
                                                     new MetadataCollector(txn.originals(), cfs.metadata.comparator, 0),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
         sstableWriter.switchWriter(writer);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
index 5328fa5..ad58967 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MajorLeveledCompactionWriter.java
@@ -28,7 +28,6 @@ import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.compaction.LeveledManifest;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -49,12 +48,12 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
     private final boolean skipAncestors;
 
     @SuppressWarnings("resource")
-    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline, OperationType compactionType)
+    public MajorLeveledCompactionWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, boolean offline)
     {
         super(cfs, txn, nonExpiredSSTables, offline);
         this.maxSSTableSize = maxSSTableSize;
         this.allSSTables = txn.originals();
-        expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType));
+        expectedWriteSize = Math.min(maxSSTableSize, cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType()));
         long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(nonExpiredSSTables) / maxSSTableSize);
         long keysPerSSTable = estimatedTotalKeys / estimatedSSTables;
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
@@ -64,13 +63,14 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
             logger.warn("Many sstables involved in compaction, skipping storing ancestor information to avoid running out of memory");
 
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                     keysPerSSTable,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
         sstableWriter.switchWriter(writer);
     }
 
@@ -92,13 +92,14 @@ public class MajorLeveledCompactionWriter extends CompactionAwareWriter
 
             averageEstimatedKeysPerSSTable = Math.round(((double) averageEstimatedKeysPerSSTable * sstablesWritten + partitionsWritten) / (sstablesWritten + 1));
             File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                         averageEstimatedKeysPerSSTable,
                                                         minRepairedAt,
                                                         cfs.metadata,
                                                         cfs.partitioner,
                                                         new MetadataCollector(allSSTables, cfs.metadata.comparator, currentLevel, skipAncestors),
-                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                        txn);
             sstableWriter.switchWriter(writer);
             partitionsWritten = 0;
             sstablesWritten++;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
index 4832fd5..9902357 100644
--- a/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
+++ b/src/java/org/apache/cassandra/db/compaction/writers/MaxSSTableSizeWriter.java
@@ -24,7 +24,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.SerializationHeader;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
-import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
@@ -41,25 +40,26 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
     private final Set<SSTableReader> allSSTables;
 
     @SuppressWarnings("resource")
-    public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline, OperationType compactionType)
+    public MaxSSTableSizeWriter(ColumnFamilyStore cfs, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables, long maxSSTableSize, int level, boolean offline)
     {
         super(cfs, txn, nonExpiredSSTables, offline);
         this.allSSTables = txn.originals();
         this.level = level;
         this.maxSSTableSize = maxSSTableSize;
-        long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, compactionType);
+        long totalSize = cfs.getExpectedCompactedFileSize(nonExpiredSSTables, txn.opType());
         expectedWriteSize = Math.min(maxSSTableSize, totalSize);
         estimatedTotalKeys = SSTableReader.getApproximateKeyCount(nonExpiredSSTables);
         estimatedSSTables = Math.max(1, estimatedTotalKeys / maxSSTableSize);
         File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
         @SuppressWarnings("resource")
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
+        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
                                                     estimatedTotalKeys / estimatedSSTables,
                                                     minRepairedAt,
                                                     cfs.metadata,
                                                     cfs.partitioner,
                                                     new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
-                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+                                                    SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                    txn);
         sstableWriter.switchWriter(writer);
     }
 
@@ -71,13 +71,14 @@ public class MaxSSTableSizeWriter extends CompactionAwareWriter
         {
             File sstableDirectory = cfs.directories.getLocationForDisk(getWriteDirectory(expectedWriteSize));
             @SuppressWarnings("resource")
-            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(sstableDirectory)),
-                                                                estimatedTotalKeys / estimatedSSTables,
-                                                                minRepairedAt,
-                                                                cfs.metadata,
-                                                                cfs.partitioner,
-                                                                new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
-                                                                SerializationHeader.make(cfs.metadata, nonExpiredSSTables));
+            SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getSSTablePath(sstableDirectory)),
+                                                        estimatedTotalKeys / estimatedSSTables,
+                                                        minRepairedAt,
+                                                        cfs.metadata,
+                                                        cfs.partitioner,
+                                                        new MetadataCollector(allSSTables, cfs.metadata.comparator, level),
+                                                        SerializationHeader.make(cfs.metadata, nonExpiredSSTables),
+                                                        txn);
 
             sstableWriter.switchWriter(writer);
         }