You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2015/07/24 18:32:11 UTC

[2/5] cassandra git commit: Introduce safer durable sstable membership management (and simplify cleanup of compaction leftovers)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
----------------------------------------------------------------------
diff --git a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
index fcec40d..b49055d 100644
--- a/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
+++ b/test/long/org/apache/cassandra/io/sstable/CQLSSTableWriterLongTest.java
@@ -30,8 +30,6 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.config.Config;
-import org.apache.cassandra.config.Schema;
-import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.service.StorageService;
 
 public class CQLSSTableWriterLongTest
@@ -39,8 +37,7 @@ public class CQLSSTableWriterLongTest
     @BeforeClass
     public static void setup() throws Exception
     {
-        SchemaLoader.cleanupAndLeaveDirs();
-        Keyspace.setInitialized();
+        SchemaLoader.prepareServer();
         StorageService.instance.initServer();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/MockSchema.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/MockSchema.java b/test/unit/org/apache/cassandra/MockSchema.java
index 104e88f..d9c7e8b 100644
--- a/test/unit/org/apache/cassandra/MockSchema.java
+++ b/test/unit/org/apache/cassandra/MockSchema.java
@@ -60,7 +60,7 @@ public class MockSchema
     private static final AtomicInteger id = new AtomicInteger();
     public static final Keyspace ks = Keyspace.mockKS(KeyspaceMetadata.create("mockks", KeyspaceParams.simpleTransient(1)));
 
-    private static final IndexSummary indexSummary;
+    public static final IndexSummary indexSummary;
     private static final SegmentedFile segmentedFile = new BufferedSegmentedFile(new ChannelProxy(temp("mocksegmentedfile")), 0);
 
     public static Memtable memtable(ColumnFamilyStore cfs)
@@ -88,8 +88,7 @@ public class MockSchema
         Descriptor descriptor = new Descriptor(cfs.directories.getDirectoryForNewSSTables(),
                                                cfs.keyspace.getName(),
                                                cfs.getColumnFamilyName(),
-                                               generation,
-                                               Descriptor.Type.FINAL);
+                                               generation);
         Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
         for (Component component : components)
         {
@@ -132,8 +131,13 @@ public class MockSchema
 
     public static ColumnFamilyStore newCFS()
     {
+        return newCFS(ks.getName());
+    }
+
+    public static ColumnFamilyStore newCFS(String ksname)
+    {
         String cfname = "mockcf" + (id.incrementAndGet());
-        CFMetaData metadata = newCFMetaData(ks.getName(), cfname);
+        CFMetaData metadata = newCFMetaData(ksname, cfname);
         return new ColumnFamilyStore(ks, cfname, Murmur3Partitioner.instance, 0, metadata, new Directories(metadata), false, false);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 654b8c6..c828de9 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -53,12 +53,9 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.VersionedValue;
-import org.apache.cassandra.io.sstable.Component;
 import org.apache.cassandra.io.sstable.Descriptor;
-import org.apache.cassandra.io.sstable.IndexSummary;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.AlwaysPresentFilter;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.CounterId;
 import org.apache.cassandra.utils.FBUtilities;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/cql3/CQLTester.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java
index 06b5ceb..ddc41c7 100644
--- a/test/unit/org/apache/cassandra/cql3/CQLTester.java
+++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java
@@ -282,7 +282,7 @@ public abstract class CQLTester
                         schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typesToDrop.get(i)));
 
                     // Dropping doesn't delete the sstables. It's not a huge deal but it's cleaner to cleanup after us
-                    // Thas said, we shouldn't delete blindly before the SSTableDeletingTask for the table we drop
+                    // Thas said, we shouldn't delete blindly before the TransactionLogs.SSTableTidier for the table we drop
                     // have run or they will be unhappy. Since those taks are scheduled on StorageService.tasks and that's
                     // mono-threaded, just push a task on the queue to find when it's empty. No perfect but good enough.
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 9da4876..2a15bdf 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -49,7 +49,7 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
-
+import static junit.framework.Assert.assertNotNull;
 @RunWith(OrderedJUnit4ClassRunner.class)
 public class ColumnFamilyStoreTest
 {
@@ -349,8 +349,8 @@ public class ColumnFamilyStoreTest
 
         for (int version = 1; version <= 2; ++version)
         {
-            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version, Descriptor.Type.FINAL);
-            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version, Descriptor.Type.FINAL);
+            Descriptor existing = new Descriptor(cfs.directories.getDirectoryForNewSSTables(), KEYSPACE2, CF_STANDARD1, version);
+            Descriptor desc = new Descriptor(Directories.getBackupsDirectory(existing), KEYSPACE2, CF_STANDARD1, version);
             for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.STATS })
                 assertTrue("Cannot find backed-up file:" + desc.filenameFor(c), new File(desc.filenameFor(c)).exists());
         }
@@ -389,126 +389,10 @@ public class ColumnFamilyStoreTest
 //        assertEquals(ByteBufferUtil.bytes("B"), cfSliced.getColumn(CellNames.compositeDense(superColName, ByteBufferUtil.bytes("b"))).value());
 //    }
 
-    // TODO: fix once SSTableSimpleWriter's back in
-//    @Test
-//    public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
-//    {
-//        String ks = KEYSPACE1;
-//        String cf = CF_STANDARD3; // should be empty
-//
-//        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
-//        Directories dir = new Directories(cfmeta);
-//        ByteBuffer key = bytes("key");
-//
-//        // 1st sstable
-//        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(), cfmeta, StorageService.getPartitioner());
-//        writer.newRow(key);
-//        writer.addColumn(bytes("col"), bytes("val"), 1);
-//        writer.close();
-//
-//        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-//        assertEquals(1, sstables.size());
-//
-//        Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
-//        final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
-//
-//        // simulate incomplete compaction
-//        writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
-//                                         cfmeta, StorageService.getPartitioner())
-//        {
-//            protected SSTableWriter getWriter()
-//            {
-//                MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
-//                collector.addAncestor(sstable1.descriptor.generation); // add ancestor from previously written sstable
-//                return SSTableWriter.create(createDescriptor(directory, metadata.ksName, metadata.cfName, DatabaseDescriptor.getSSTableFormat()),
-//                        0L,
-//                        ActiveRepairService.UNREPAIRED_SSTABLE,
-//                        metadata,
-//                        DatabaseDescriptor.getPartitioner(),
-//                        collector);
-//            }
-//        };
-//        writer.newRow(key);
-//        writer.addColumn(bytes("col"), bytes("val"), 1);
-//        writer.close();
-//
-//        // should have 2 sstables now
-//        sstables = dir.sstableLister().list();
-//        assertEquals(2, sstables.size());
-//
-//        SSTableReader sstable2 = SSTableReader.open(sstable1.descriptor);
-//        UUID compactionTaskID = SystemKeyspace.startCompaction(
-//                Keyspace.open(ks).getColumnFamilyStore(cf),
-//                Collections.singleton(sstable2));
-//
-//        Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
-//        unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
-//        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(cfmeta, unfinishedCompaction);
-//
-//        // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
-//        sstables = dir.sstableLister().list();
-//        assertEquals(1, sstables.size());
-//        assertTrue(sstables.containsKey(sstable1.descriptor));
-//
-//        Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
-//        assertTrue(unfinished.isEmpty());
-//        sstable1.selfRef().release();
-//        sstable2.selfRef().release();
-//    }
 
     // TODO: Fix once SSTableSimpleWriter's back in
     // @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
-//    @Test
-//    public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
-//    {
-//        final String ks = KEYSPACE1;
-//        final String cf = CF_STANDARD4; // should be empty
-//
-//        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
-//        Directories dir = new Directories(cfmeta);
-//        ByteBuffer key = bytes("key");
-//
-//        // Write SSTable generation 3 that has ancestors 1 and 2
-//        final Set<Integer> ancestors = Sets.newHashSet(1, 2);
-//        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
-//                                                cfmeta, StorageService.getPartitioner())
-//        {
-//            protected SSTableWriter getWriter()
-//            {
-//                MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
-//                for (int ancestor : ancestors)
-//                    collector.addAncestor(ancestor);
-//                String file = new Descriptor(directory, ks, cf, 3, Descriptor.Type.TEMP).filenameFor(Component.DATA);
-//                return SSTableWriter.create(Descriptor.fromFilename(file),
-//                        0L,
-//                        ActiveRepairService.UNREPAIRED_SSTABLE,
-//                        metadata,
-//                        StorageService.getPartitioner(),
-//                        collector);
-//            }
-//        };
-//        writer.newRow(key);
-//        writer.addColumn(bytes("col"), bytes("val"), 1);
-//        writer.close();
-//
-//        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-//        assert sstables.size() == 1;
-//
-//        Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
-//        final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
-//
-//        // simulate we don't have generation in compaction_history
-//        Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
-//        UUID compactionTaskID = UUID.randomUUID();
-//        for (Integer ancestor : ancestors)
-//            unfinishedCompactions.put(ancestor, compactionTaskID);
-//        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(cfmeta, unfinishedCompactions);
-//
-//        // SSTable should not be deleted
-//        sstables = dir.sstableLister().list();
-//        assert sstables.size() == 1;
-//        assert sstables.containsKey(sstable1.descriptor);
-//    }
+
 
     // TODO: Fix once SSTableSimpleWriter's back in
 //    @Test
@@ -625,4 +509,30 @@ public class ColumnFamilyStoreTest
         }
         assertEquals(count, found);
     }
+
+    @Test
+    public void testScrubDataDirectories() throws Throwable
+    {
+        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(CF_STANDARD1);
+
+        ColumnFamilyStore.scrubDataDirectories(cfs.metadata);
+
+        new RowUpdateBuilder(cfs.metadata, 2, "key").clustering("name").add("val", "2").build().applyUnsafe();
+        cfs.forceBlockingFlush();
+
+        // Nuke the metadata and reload that sstable
+        Collection<SSTableReader> ssTables = cfs.getSSTables();
+        assertEquals(1, ssTables.size());
+        SSTableReader ssTable = ssTables.iterator().next();
+
+        String dataFileName = ssTable.descriptor.filenameFor(Component.DATA);
+        String tmpDataFileName = ssTable.descriptor.tmpFilenameFor(Component.DATA);
+        new File(dataFileName).renameTo(new File(tmpDataFileName));
+
+        ColumnFamilyStore.scrubDataDirectories(cfs.metadata);
+
+        List<File> ssTableFiles = new Directories(cfs.metadata).sstableLister().listFiles();
+        assertNotNull(ssTableFiles);
+        assertEquals(0, ssTableFiles.size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/DirectoriesTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/DirectoriesTest.java b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
index 6f3ccc9..14db2d1 100644
--- a/test/unit/org/apache/cassandra/db/DirectoriesTest.java
+++ b/test/unit/org/apache/cassandra/db/DirectoriesTest.java
@@ -95,22 +95,22 @@ public class DirectoriesTest
             File dir = cfDir(cfm);
             dir.mkdirs();
 
-            createFakeSSTable(dir, cfm.cfName, 1, false, fs);
-            createFakeSSTable(dir, cfm.cfName, 2, true, fs);
+            createFakeSSTable(dir, cfm.cfName, 1, fs);
+            createFakeSSTable(dir, cfm.cfName, 2, fs);
 
             File backupDir = new File(dir, Directories.BACKUPS_SUBDIR);
             backupDir.mkdir();
-            createFakeSSTable(backupDir, cfm.cfName, 1, false, fs);
+            createFakeSSTable(backupDir, cfm.cfName, 1, fs);
 
             File snapshotDir = new File(dir, Directories.SNAPSHOT_SUBDIR + File.separator + "42");
             snapshotDir.mkdirs();
-            createFakeSSTable(snapshotDir, cfm.cfName, 1, false, fs);
+            createFakeSSTable(snapshotDir, cfm.cfName, 1, fs);
         }
     }
 
-    private static void createFakeSSTable(File dir, String cf, int gen, boolean temp, List<File> addTo) throws IOException
+    private static void createFakeSSTable(File dir, String cf, int gen, List<File> addTo) throws IOException
     {
-        Descriptor desc = new Descriptor(dir, KS, cf, gen, temp ? Descriptor.Type.TEMP : Descriptor.Type.FINAL);
+        Descriptor desc = new Descriptor(dir, KS, cf, gen);
         for (Component c : new Component[]{ Component.DATA, Component.PRIMARY_INDEX, Component.FILTER })
         {
             File f = new File(desc.filenameFor(c));
@@ -145,7 +145,7 @@ public class DirectoriesTest
             Directories directories = new Directories(cfm);
             assertEquals(cfDir(cfm), directories.getDirectoryForNewSSTables());
 
-            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
+            Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
             File snapshotDir = new File(cfDir(cfm),  File.separator + Directories.SNAPSHOT_SUBDIR + File.separator + "42");
             assertEquals(snapshotDir, Directories.getSnapshotDirectory(desc, "42"));
 
@@ -173,8 +173,8 @@ public class DirectoriesTest
         {
             assertEquals(cfDir(INDEX_CFM), dir);
         }
-        Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0, Descriptor.Type.FINAL);
-        Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0, Descriptor.Type.FINAL);
+        Descriptor parentDesc = new Descriptor(parentDirectories.getDirectoryForNewSSTables(), KS, PARENT_CFM.cfName, 0);
+        Descriptor indexDesc = new Descriptor(indexDirectories.getDirectoryForNewSSTables(), KS, INDEX_CFM.cfName, 0);
 
         // snapshot dir should be created under its parent's
         File parentSnapshotDirectory = Directories.getSnapshotDirectory(parentDesc, "test");
@@ -191,9 +191,9 @@ public class DirectoriesTest
                      indexDirectories.snapshotCreationTime("test"));
 
         // check true snapshot size
-        Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0, Descriptor.Type.FINAL);
+        Descriptor parentSnapshot = new Descriptor(parentSnapshotDirectory, KS, PARENT_CFM.cfName, 0);
         createFile(parentSnapshot.filenameFor(Component.DATA), 30);
-        Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0, Descriptor.Type.FINAL);
+        Descriptor indexSnapshot = new Descriptor(indexSnapshotDirectory, KS, INDEX_CFM.cfName, 0);
         createFile(indexSnapshot.filenameFor(Component.DATA), 40);
 
         assertEquals(30, parentDirectories.trueSnapshotsSize());
@@ -311,7 +311,7 @@ public class DirectoriesTest
             final String n = Long.toString(System.nanoTime());
             Callable<File> directoryGetter = new Callable<File>() {
                 public File call() throws Exception {
-                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1, Descriptor.Type.FINAL);
+                    Descriptor desc = new Descriptor(cfDir(cfm), KS, cfm.cfName, 1);
                     return Directories.getSnapshotDirectory(desc, n);
                 }
             };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java
index 47bfa0c..181b4e0 100644
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@ -65,6 +65,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.OutputHandler;
 
 import static org.junit.Assert.*;
 import static org.junit.Assume.assumeTrue;
@@ -315,7 +316,7 @@ public class ScrubTest
          * Code used to generate an outOfOrder sstable. The test for out-of-order key in BigTableWriter must also be commented out.
          * The test also assumes an ordered partitioner.
         List<String> keys = Arrays.asList("t", "a", "b", "z", "c", "y", "d");
-        String filename = cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root")));
+        SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))),
         SSTableWriter writer = SSTableWriter.create(cfs.metadata,
                                                     Descriptor.fromFilename(filename),
                                                     keys.size(),
@@ -341,7 +342,7 @@ public class ScrubTest
 
         File rootDir = new File(root);
         assert rootDir.isDirectory();
-        Descriptor desc = new Descriptor("la", rootDir, KEYSPACE, columnFamily, 1, Descriptor.Type.FINAL, SSTableFormat.Type.BIG);
+        Descriptor desc = new Descriptor("la", rootDir, KEYSPACE, columnFamily, 1, SSTableFormat.Type.BIG);
         CFMetaData metadata = Schema.instance.getCFMetaData(desc.ksname, desc.cfname);
 
         try
@@ -366,7 +367,7 @@ public class ScrubTest
             sstable.last = sstable.first;
 
         try (LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.SCRUB, sstable);
-             Scrubber scrubber = new Scrubber(cfs, txn, false, true, true);)
+             Scrubber scrubber = new Scrubber(cfs, txn, false, new OutputHandler.LogOutput(), true, true, true))
         {
             scrubber.scrub();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
index 9ea7f93..d58985c 100644
--- a/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemKeyspaceTest.java
@@ -17,11 +17,15 @@
  */
 package org.apache.cassandra.db;
 
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.*;
 
+import org.apache.commons.io.FileUtils;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -30,6 +34,7 @@ import org.apache.cassandra.cql3.QueryProcessor;
 import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.dht.ByteOrderedPartitioner.BytesToken;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.CassandraVersion;
@@ -39,6 +44,8 @@ import static org.junit.Assert.assertTrue;
 
 public class SystemKeyspaceTest
 {
+    public static final String MIGRATION_SSTABLES_ROOT = "migration-sstable-root";
+
     @BeforeClass
     public static void prepSnapshotTracker()
     {
@@ -145,6 +152,47 @@ public class SystemKeyspaceTest
         Keyspace.clearSnapshot(null, SystemKeyspace.NAME);
     }
 
+    @Test
+    public void testMigrateDataDirs() throws IOException
+    {
+        Path migrationSSTableRoot = Paths.get(System.getProperty(MIGRATION_SSTABLES_ROOT), "2.2");
+        Path dataDir = Paths.get(DatabaseDescriptor.getAllDataFileLocations()[0]);
+
+        FileUtils.copyDirectory(migrationSSTableRoot.toFile(), dataDir.toFile());
+
+        assertEquals(5, numLegacyFiles()); // see test data
+
+        SystemKeyspace.migrateDataDirs();
+
+        assertEquals(0, numLegacyFiles());
+    }
+
+    private static int numLegacyFiles()
+    {
+        int ret = 0;
+        Iterable<String> dirs = Arrays.asList(DatabaseDescriptor.getAllDataFileLocations());
+        for (String dataDir : dirs)
+        {
+            File dir = new File(dataDir);
+            for (File ksdir : dir.listFiles((d, n) -> d.isDirectory()))
+            {
+                for (File cfdir : ksdir.listFiles((d, n) -> d.isDirectory()))
+                {
+                    if (Descriptor.isLegacyFile(cfdir.getName()))
+                    {
+                        ret++;
+                    }
+                    else
+                    {
+                        File[] legacyFiles = cfdir.listFiles((d, n) -> Descriptor.isLegacyFile(n));
+                        ret += legacyFiles.length;
+                    }
+                }
+            }
+        }
+        return ret;
+    }
+
     private String getOlderVersionString()
     {
         String version = FBUtilities.getReleaseVersionString();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
index b18e67b..9de33db 100644
--- a/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/AntiCompactionTest.java
@@ -156,9 +156,9 @@ public class AntiCompactionTest
     private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
     {
         File dir = cfs.directories.getDirectoryForNewSSTables();
-        String filename = cfs.getTempSSTablePath(dir);
+        String filename = cfs.getSSTablePath(dir);
 
-        try (SSTableWriter writer = SSTableWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(filename, 0, 0, new SerializationHeader(cfm, cfm.partitionColumns(), EncodingStats.NO_STATS)))
         {
             for (int i = 0; i < count; i++)
             {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
index 0bcd603..30f14fc 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionAwareWriterTest.java
@@ -78,7 +78,7 @@ public class CompactionAwareWriterTest extends CQLTester
         populate(rowCount);
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false, OperationType.COMPACTION);
+        CompactionAwareWriter writer = new DefaultCompactionWriter(cfs, txn, txn.originals(), false);
         int rows = compact(cfs, txn, writer);
         assertEquals(1, cfs.getSSTables().size());
         assertEquals(rowCount, rows);
@@ -97,7 +97,7 @@ public class CompactionAwareWriterTest extends CQLTester
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/10;
-        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false, OperationType.COMPACTION);
+        CompactionAwareWriter writer = new MaxSSTableSizeWriter(cfs, txn, txn.originals(), sstableSize, 0, false);
         int rows = compact(cfs, txn, writer);
         assertEquals(10, cfs.getSSTables().size());
         assertEquals(rowCount, rows);
@@ -114,7 +114,7 @@ public class CompactionAwareWriterTest extends CQLTester
         populate(rowCount);
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
-        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), OperationType.COMPACTION, 0);
+        CompactionAwareWriter writer = new SplittingSizeTieredCompactionWriter(cfs, txn, txn.originals(), 0);
         int rows = compact(cfs, txn, writer);
         long expectedSize = beforeSize / 2;
         List<SSTableReader> sortedSSTables = new ArrayList<>(cfs.getSSTables());
@@ -150,7 +150,7 @@ public class CompactionAwareWriterTest extends CQLTester
         LifecycleTransaction txn = cfs.getTracker().tryModify(cfs.getSSTables(), OperationType.COMPACTION);
         long beforeSize = txn.originals().iterator().next().onDiskLength();
         int sstableSize = (int)beforeSize/targetSSTableCount;
-        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false, OperationType.COMPACTION);
+        CompactionAwareWriter writer = new MajorLeveledCompactionWriter(cfs, txn, txn.originals(), sstableSize, false);
         int rows = compact(cfs, txn, writer);
         assertEquals(targetSSTableCount, cfs.getSSTables().size());
         int [] levelCounts = new int[5];

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index 2fa8488..7beb405 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -21,6 +21,11 @@ package org.apache.cassandra.db.compaction;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
 import org.apache.cassandra.OrderedJUnit4ClassRunner;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.Util;
@@ -34,11 +39,6 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
 
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-
 import static org.junit.Assert.*;
 
 @RunWith(OrderedJUnit4ClassRunner.class)
@@ -394,21 +394,25 @@ public class CompactionsTest
         cf.addColumn(Util.column("a", "a", 3));
         cf.deletionInfo().add(new RangeTombstone(Util.cellname("0"), Util.cellname("b"), 2, (int) (System.currentTimeMillis()/1000)),cfmeta.comparator);
 
-        SSTableWriter writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
-
-
-        writer.append(Util.dk("0"), cf);
-        writer.append(Util.dk("1"), cf);
-        writer.append(Util.dk("3"), cf);
+        Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(dir.getDirectoryForNewSSTables()));
+        try(SSTableTxnWriter writer = SSTableTxnWriter.create(desc, 0, 0, 0))
+        {
+            writer.append(Util.dk("0"), cf);
+            writer.append(Util.dk("1"), cf);
+            writer.append(Util.dk("3"), cf);
 
-        cfs.addSSTable(writer.closeAndOpenReader());
-        writer = SSTableWriter.create(Descriptor.fromFilename(cfs.getTempSSTablePath(dir.getDirectoryForNewSSTables())), 0, 0, 0);
+            cfs.addSSTable(writer.closeAndOpenReader());
+        }
 
-        writer.append(Util.dk("0"), cf);
-        writer.append(Util.dk("1"), cf);
-        writer.append(Util.dk("2"), cf);
-        writer.append(Util.dk("3"), cf);
-        cfs.addSSTable(writer.closeAndOpenReader());
+        desc = Descriptor.fromFilename(cfs.getSSTablePath(dir.getDirectoryForNewSSTables()));
+        try (SSTableTxnWriter writer = SSTableTxnWriter.create(desc, 0, 0, 0))
+        {
+            writer.append(Util.dk("0"), cf);
+            writer.append(Util.dk("1"), cf);
+            writer.append(Util.dk("2"), cf);
+            writer.append(Util.dk("3"), cf);
+            cfs.addSSTable(writer.closeAndOpenReader());
+        }
 
         Collection<SSTableReader> toCompact = cfs.getSSTables();
         assert toCompact.size() == 2;
@@ -439,35 +443,6 @@ public class CompactionsTest
         assertEquals(keys, k);
     }
 
-    @Test
-    public void testCompactionLog() throws Exception
-    {
-        SystemKeyspace.discardCompactionsInProgress();
-
-        String cf = "Standard4";
-        ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf);
-        SchemaLoader.insertData(KEYSPACE1, cf, 0, 1);
-        cfs.forceBlockingFlush();
-
-        Collection<SSTableReader> sstables = cfs.getSSTables();
-        assertFalse(sstables.isEmpty());
-        Set<Integer> generations = Sets.newHashSet(Iterables.transform(sstables, new Function<SSTableReader, Integer>()
-        {
-            public Integer apply(SSTableReader sstable)
-            {
-                return sstable.descriptor.generation;
-            }
-        }));
-        UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
-        Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
-        Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
-        assertTrue(unfinishedCompactions.containsAll(generations));
-
-        SystemKeyspace.finishCompaction(taskId);
-        compactionLogs = SystemKeyspace.getUnfinishedCompactions();
-        assertFalse(compactionLogs.containsKey(Pair.create(KEYSPACE1, cf)));
-    }
-
     private void testDontPurgeAccidentaly(String k, String cfname) throws InterruptedException
     {
         // This test catches the regression of CASSANDRA-2786

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index ab84555..a869a7f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -190,6 +190,9 @@ public class LeveledCompactionStrategyTest
         // L0 is the lowest priority, so when that's done, we know everything is done
         while (strategy.getSSTableCountPerLevel()[0] > 1)
             Thread.sleep(100);
+
+        // in AbstractCompationStrategy.replaceSSTables() first we remove and then we add sstables so wait a little bit longer
+        Thread.sleep(10);
     }
 
     @Test
@@ -211,7 +214,7 @@ public class LeveledCompactionStrategyTest
         }
 
         waitForLeveling(cfs);
-        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) ( cfs.getCompactionStrategyManager()).getStrategies().get(1);
+        LeveledCompactionStrategy strategy = (LeveledCompactionStrategy) (cfs.getCompactionStrategyManager()).getStrategies().get(1);
         assert strategy.getLevelSize(1) > 0;
 
         // get LeveledScanner for level 1 sstables

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
index 18bce10..e9c903e 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/HelpersTest.java
@@ -18,6 +18,8 @@
 */
 package org.apache.cassandra.db.lifecycle;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -32,11 +34,16 @@ import junit.framework.Assert;
 import org.apache.cassandra.MockSchema;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.format.big.BigTableReader;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.Refs;
 
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertNotNull;
+
 public class HelpersTest
 {
 
@@ -150,19 +157,29 @@ public class HelpersTest
         for (SSTableReader reader : readers)
             Assert.assertTrue(reader.isReplaced());
         accumulate = Helpers.setReplaced(readers, null);
-        Assert.assertNotNull(accumulate);
+        assertNotNull(accumulate);
     }
 
     @Test
     public void testMarkObsolete()
     {
         ColumnFamilyStore cfs = MockSchema.newCFS();
+        TransactionLogs txnLogs = new TransactionLogs(OperationType.UNKNOWN, cfs.metadata);
         Iterable<SSTableReader> readers = Lists.newArrayList(MockSchema.sstable(1, cfs), MockSchema.sstable(2, cfs));
-        Throwable accumulate = Helpers.markObsolete(null, readers, null);
+
+        List<TransactionLogs.Obsoletion> obsoletions = new ArrayList<>();
+        Assert.assertNull(Helpers.prepareForObsoletion(readers, txnLogs, obsoletions, null));
+        assertNotNull(obsoletions);
+        assertEquals(2, obsoletions.size());
+
+        Throwable accumulate = Helpers.markObsolete(obsoletions, null);
         Assert.assertNull(accumulate);
         for (SSTableReader reader : readers)
             Assert.assertTrue(reader.isMarkedCompacted());
-        accumulate = Helpers.markObsolete(null, readers, null);
-        Assert.assertNotNull(accumulate);
+
+        accumulate = Helpers.markObsolete(obsoletions, null);
+        assertNotNull(accumulate);
+
+        txnLogs.finish();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
index d6af447..a876891 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/LifecycleTransactionTest.java
@@ -33,7 +33,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction.ReaderState.Action;
-import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
@@ -249,7 +248,7 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest
 
     protected TestableTransaction newTest()
     {
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
         SSTableReader.resetTidying();
         return new TxnTest();
     }
@@ -399,6 +398,12 @@ public class LifecycleTransactionTest extends AbstractTransactionalTest
             for (SSTableReader reader : concat(loggedObsolete, stagedObsolete))
                 Assert.assertTrue(reader.selfRef().globalCount() == 0);
         }
+
+        @Override
+        protected boolean commitCanThrow()
+        {
+            return true;
+        }
     }
 
     private static SSTableReader[] readersArray(int lb, int ub, ColumnFamilyStore cfs)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
new file mode 100644
index 0000000..5291baa
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/RealTransactionsTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.CompactionIterator;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.CQLSSTableWriter;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.SSTableRewriter;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to simulate real transactions such as compactions and flushing
+ * using SSTableRewriter, ColumnFamilyStore, LifecycleTransaction, TransactionLogs, etc
+ */
+public class RealTransactionsTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "TransactionLogsTest";
+    private static final String REWRITE_FINISHED_CF = "RewriteFinished";
+    private static final String REWRITE_ABORTED_CF = "RewriteAborted";
+    private static final String FLUSH_CF = "Flush";
+
+    @BeforeClass
+    public static void setUp()
+    {
+        MockSchema.cleanup();
+
+        SchemaLoader.prepareServer();
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, REWRITE_FINISHED_CF),
+                                    SchemaLoader.standardCFMD(KEYSPACE, REWRITE_ABORTED_CF),
+                                    SchemaLoader.standardCFMD(KEYSPACE, FLUSH_CF));
+    }
+
+    @Test
+    public void testRewriteFinished() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(REWRITE_FINISHED_CF);
+
+        SSTableReader oldSSTable = getSSTable(cfs, 1);
+        LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
+        SSTableReader newSSTable = replaceSSTable(cfs, txn, false);
+        TransactionLogs.waitForDeletions();
+
+        assertFiles(txn.logs().getDataFolder(), new HashSet<>(newSSTable.getAllFilePaths()));
+        assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
+    }
+
+    @Test
+    public void testRewriteAborted() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(REWRITE_ABORTED_CF);
+
+        SSTableReader oldSSTable = getSSTable(cfs, 1);
+        LifecycleTransaction txn = cfs.getTracker().tryModify(oldSSTable, OperationType.COMPACTION);
+
+        replaceSSTable(cfs, txn, true);
+        TransactionLogs.waitForDeletions();
+
+        assertFiles(txn.logs().getDataFolder(), new HashSet<>(oldSSTable.getAllFilePaths()));
+        assertFiles(txn.logs().getLogsFolder(), Collections.<String>emptySet());
+    }
+
+    @Test
+    public void testFlush() throws IOException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(FLUSH_CF);
+
+        SSTableReader ssTableReader = getSSTable(cfs, 100);
+
+        String dataFolder = cfs.getSSTables().iterator().next().descriptor.directory.getPath();
+        String transactionLogsFolder = StringUtils.join(dataFolder, File.separator, Directories.TRANSACTIONS_SUBDIR);
+
+        assertTrue(new File(transactionLogsFolder).exists());
+        assertFiles(transactionLogsFolder, Collections.<String>emptySet());
+
+        assertFiles(dataFolder, new HashSet<>(ssTableReader.getAllFilePaths()));
+    }
+
+    private SSTableReader getSSTable(ColumnFamilyStore cfs, int numPartitions) throws IOException
+    {
+        createSSTable(cfs, numPartitions);
+
+        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+        assertEquals(1, sstables.size());
+        return sstables.iterator().next();
+    }
+
+    private void createSSTable(ColumnFamilyStore cfs, int numPartitions) throws IOException
+    {
+        cfs.truncateBlocking();
+
+        String schema = "CREATE TABLE %s.%s (key ascii, name ascii, val ascii, val1 ascii, PRIMARY KEY (key, name))";
+        String query = "INSERT INTO %s.%s (key, name, val) VALUES (?, ?, ?)";
+
+        try (CQLSSTableWriter writer = CQLSSTableWriter.builder()
+                                                       .withPartitioner(StorageService.getPartitioner())
+                                                       .inDirectory(cfs.directories.getDirectoryForNewSSTables())
+                                                       .forTable(String.format(schema, cfs.keyspace.getName(), cfs.name))
+                                                       .using(String.format(query, cfs.keyspace.getName(), cfs.name))
+                                                       .build())
+        {
+            for (int j = 0; j < numPartitions; j ++)
+                writer.addRow(String.format("key%d", j), "col1", "0");
+        }
+
+        cfs.loadNewSSTables();
+    }
+
+    private SSTableReader replaceSSTable(ColumnFamilyStore cfs, LifecycleTransaction txn, boolean fail)
+    {
+        List<SSTableReader> newsstables = null;
+        int nowInSec = FBUtilities.nowInSeconds();
+        try (CompactionController controller = new CompactionController(cfs, txn.originals(), cfs.gcBefore(FBUtilities.nowInSeconds())))
+        {
+            try (SSTableRewriter rewriter = new SSTableRewriter(cfs, txn, 1000, false);
+                 AbstractCompactionStrategy.ScannerList scanners = cfs.getCompactionStrategyManager().getScanners(txn.originals());
+                 CompactionIterator ci = new CompactionIterator(txn.opType(), scanners.scanners, controller, nowInSec, txn.opId())
+            )
+            {
+                long lastCheckObsoletion = System.nanoTime();
+                File directory = txn.originals().iterator().next().descriptor.directory;
+                Descriptor desc = Descriptor.fromFilename(cfs.getSSTablePath(directory));
+                CFMetaData metadata = Schema.instance.getCFMetaData(desc);
+                rewriter.switchWriter(SSTableWriter.create(metadata,
+                                                           desc,
+                                                           0,
+                                                           0,
+                                                           0,
+                                                           DatabaseDescriptor.getPartitioner(),
+                                                           SerializationHeader.make(cfs.metadata, txn.originals()),
+                                                           txn));
+                while (ci.hasNext())
+                {
+                    rewriter.append(ci.next());
+
+                    if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
+                    {
+                        controller.maybeRefreshOverlaps();
+                        lastCheckObsoletion = System.nanoTime();
+                    }
+                }
+
+                if (!fail)
+                    newsstables = rewriter.finish();
+                else
+                    rewriter.abort();
+            }
+        }
+
+        assertTrue(fail || newsstables != null);
+
+        if (newsstables != null)
+        {
+            Assert.assertEquals(1, newsstables.size());
+            return newsstables.iterator().next();
+        }
+
+        return null;
+    }
+
+    private void assertFiles(String dirPath, Set<String> expectedFiles)
+    {
+        File dir = new File(dirPath);
+        for (File file : dir.listFiles())
+        {
+            if (file.isDirectory())
+                continue;
+
+            String filePath = file.getPath();
+            assertTrue(filePath, expectedFiles.contains(filePath));
+            expectedFiles.remove(filePath);
+        }
+
+        assertTrue(expectedFiles.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
index dbd5287..89924a5 100644
--- a/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TrackerTest.java
@@ -40,7 +40,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Memtable;
 import org.apache.cassandra.db.commitlog.ReplayPosition;
 import org.apache.cassandra.db.compaction.OperationType;
-import org.apache.cassandra.io.sstable.SSTableDeletingTask;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.notifications.*;
 import org.apache.cassandra.utils.concurrent.OpOrder;
@@ -152,6 +151,9 @@ public class TrackerTest
 
         Assert.assertEquals(3, tracker.view.get().sstables.size());
 
+        for (SSTableReader reader : readers)
+            Assert.assertTrue(reader.isKeyCacheSetup());
+
         Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
     }
 
@@ -171,6 +173,9 @@ public class TrackerTest
 
         Assert.assertEquals(3, tracker.view.get().sstables.size());
 
+        for (SSTableReader reader : readers)
+            Assert.assertTrue(reader.isKeyCacheSetup());
+
         Assert.assertEquals(17 + 121 + 9, cfs.metric.liveDiskSpaceUsed.getCount());
         Assert.assertEquals(3, listener.senders.size());
         Assert.assertEquals(tracker, listener.senders.get(0));
@@ -182,9 +187,9 @@ public class TrackerTest
     public void testDropSSTables()
     {
         testDropSSTables(false);
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
         testDropSSTables(true);
-        SSTableDeletingTask.waitForDeletions();
+        TransactionLogs.waitForDeletions();
     }
 
     private void testDropSSTables(boolean invalidate)
@@ -200,7 +205,7 @@ public class TrackerTest
 
         try
         {
-            SSTableDeletingTask.pauseDeletions(true);
+            TransactionLogs.pauseDeletions(true);
             try (LifecycleTransaction txn = tracker.tryModify(readers.get(0), OperationType.COMPACTION))
             {
                 if (invalidate)
@@ -222,19 +227,19 @@ public class TrackerTest
                     Assert.assertEquals(0, reader.selfRef().globalCount());
                     Assert.assertTrue(reader.isMarkedCompacted());
                 }
-                Assert.assertNull(tracker.dropSSTables(new Predicate<SSTableReader>() {
-                                                           public boolean apply(SSTableReader reader)
-                                                           {
-                                                               return reader != readers.get(0);
-                                                           }
-                                                       },
-                                                       OperationType.UNKNOWN,
-                                                       null));
+
+                Assert.assertNull(tracker.dropSSTables(reader -> reader != readers.get(0), OperationType.UNKNOWN, null));
+
                 Assert.assertEquals(1, tracker.getView().sstables.size());
                 Assert.assertEquals(3, listener.received.size());
                 Assert.assertEquals(tracker, listener.senders.get(0));
-                Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(0)).removed.size());
-                Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(0)).added.size());
+                Assert.assertTrue(listener.received.get(0) instanceof SSTableDeletingNotification);
+                Assert.assertTrue(listener.received.get(1) instanceof  SSTableDeletingNotification);
+                Assert.assertTrue(listener.received.get(2) instanceof SSTableListChangedNotification);
+                Assert.assertEquals(readers.get(1), ((SSTableDeletingNotification) listener.received.get(0)).deleting);
+                Assert.assertEquals(readers.get(2), ((SSTableDeletingNotification)listener.received.get(1)).deleting);
+                Assert.assertEquals(2, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
+                Assert.assertEquals(0, ((SSTableListChangedNotification) listener.received.get(2)).added.size());
                 Assert.assertEquals(9, cfs.metric.liveDiskSpaceUsed.getCount());
                 readers.get(0).selfRef().release();
             }
@@ -248,7 +253,7 @@ public class TrackerTest
         }
         finally
         {
-            SSTableDeletingTask.pauseDeletions(false);
+            TransactionLogs.pauseDeletions(false);
         }
     }
 
@@ -299,6 +304,7 @@ public class TrackerTest
         Assert.assertEquals(1, listener.received.size());
         Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
         listener.received.clear();
+        Assert.assertTrue(reader.isKeyCacheSetup());
         Assert.assertEquals(10, cfs.metric.liveDiskSpaceUsed.getCount());
 
         // test invalidated CFS
@@ -314,8 +320,10 @@ public class TrackerTest
         Assert.assertEquals(0, tracker.getView().sstables.size());
         Assert.assertEquals(0, tracker.getView().flushingMemtables.size());
         Assert.assertEquals(0, cfs.metric.liveDiskSpaceUsed.getCount());
+        Assert.assertEquals(3, listener.received.size());
         Assert.assertEquals(reader, ((SSTableAddedNotification) listener.received.get(0)).added);
-        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(1)).removed.size());
+        Assert.assertTrue(listener.received.get(1) instanceof  SSTableDeletingNotification);
+        Assert.assertEquals(1, ((SSTableListChangedNotification) listener.received.get(2)).removed.size());
         DatabaseDescriptor.setIncrementalBackupsEnabled(backups);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
new file mode 100644
index 0000000..3150087
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/lifecycle/TransactionLogsTest.java
@@ -0,0 +1,558 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.lifecycle;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.*;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.assertNotNull;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import junit.framework.Assert;
+import org.apache.cassandra.MockSchema;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.SerializationHeader;
+import org.apache.cassandra.db.compaction.*;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.io.sstable.*;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.io.sstable.metadata.MetadataType;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.io.util.BufferedSegmentedFile;
+import org.apache.cassandra.io.util.ChannelProxy;
+import org.apache.cassandra.io.util.SegmentedFile;
+import org.apache.cassandra.utils.AlwaysPresentFilter;
+import org.apache.cassandra.utils.concurrent.AbstractTransactionalTest;
+import org.apache.cassandra.utils.concurrent.Transactional;
+
+public class TransactionLogsTest extends AbstractTransactionalTest
+{
+    private static final String KEYSPACE = "TransactionLogsTest";
+
+    @BeforeClass
+    public static void setUp()
+    {
+        MockSchema.cleanup();
+    }
+
+    protected AbstractTransactionalTest.TestableTransaction newTest() throws Exception
+    {
+        TransactionLogs.waitForDeletions();
+        SSTableReader.resetTidying();
+        return new TxnTest();
+    }
+
+    private static final class TxnTest extends TestableTransaction
+    {
+        private final static class Transaction extends Transactional.AbstractTransactional implements Transactional
+        {
+            final ColumnFamilyStore cfs;
+            final TransactionLogs txnLogs;
+            final SSTableReader sstableOld;
+            final SSTableReader sstableNew;
+            final TransactionLogs.SSTableTidier tidier;
+
+            public Transaction(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
+            {
+                this.cfs = cfs;
+                this.txnLogs = txnLogs;
+                this.sstableOld = sstable(cfs, 0, 128);
+                this.sstableNew = sstable(cfs, 1, 128);
+
+                assertNotNull(txnLogs);
+                assertNotNull(txnLogs.getId());
+                Assert.assertEquals(OperationType.COMPACTION, txnLogs.getType());
+
+                txnLogs.trackNew(sstableNew);
+                tidier = txnLogs.obsoleted(sstableOld);
+                assertNotNull(tidier);
+            }
+
+            protected Throwable doCommit(Throwable accumulate)
+            {
+                sstableOld.markObsolete(tidier);
+                sstableOld.selfRef().release();
+                TransactionLogs.waitForDeletions();
+
+                Throwable ret = txnLogs.commit(accumulate);
+
+                sstableNew.selfRef().release();
+                return ret;
+            }
+
+            protected Throwable doAbort(Throwable accumulate)
+            {
+                tidier.abort();
+                TransactionLogs.waitForDeletions();
+
+                Throwable ret = txnLogs.abort(accumulate);
+
+                sstableNew.selfRef().release();
+                sstableOld.selfRef().release();
+                return ret;
+            }
+
+            protected void doPrepare()
+            {
+                txnLogs.prepareToCommit();
+            }
+
+            protected void assertInProgress() throws Exception
+            {
+                assertFiles(txnLogs.getDataFolder(), Sets.newHashSet(Iterables.concat(sstableNew.getAllFilePaths(),
+                                                                                      sstableOld.getAllFilePaths())));
+                assertFiles(txnLogs.getLogsFolder(), Sets.newHashSet(txnLogs.getData().oldLog().file.getPath(),
+                                                                     txnLogs.getData().newLog().file.getPath()));
+                assertEquals(2, TransactionLogs.getLogFiles(cfs.metadata).size());
+            }
+
+            protected void assertPrepared() throws Exception
+            {
+            }
+
+            protected void assertAborted() throws Exception
+            {
+                assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
+                assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
+                assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+            }
+
+            protected void assertCommitted() throws Exception
+            {
+                assertFiles(txnLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+                assertFiles(txnLogs.getLogsFolder(), Collections.<String>emptySet());
+                assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+            }
+        }
+
+        final Transaction txn;
+
+        private TxnTest() throws IOException
+        {
+            this(MockSchema.newCFS(KEYSPACE));
+        }
+
+        private TxnTest(ColumnFamilyStore cfs) throws IOException
+        {
+            this(cfs, new TransactionLogs(OperationType.COMPACTION, cfs.metadata));
+        }
+
+        private TxnTest(ColumnFamilyStore cfs, TransactionLogs txnLogs) throws IOException
+        {
+            this(new Transaction(cfs, txnLogs));
+        }
+
+        private TxnTest(Transaction txn)
+        {
+            super(txn);
+            this.txn = txn;
+        }
+
+        protected void assertInProgress() throws Exception
+        {
+            txn.assertInProgress();
+        }
+
+        protected void assertPrepared() throws Exception
+        {
+            txn.assertPrepared();
+        }
+
+        protected void assertAborted() throws Exception
+        {
+            txn.assertAborted();
+        }
+
+        protected void assertCommitted() throws Exception
+        {
+            txn.assertCommitted();
+        }
+    }
+
+    @Test
+    public void testUntrack() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+        // complete a transaction without keep the new files since they were untracked
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        transactionLogs.trackNew(sstableNew);
+        transactionLogs.untrackNew(sstableNew);
+
+        transactionLogs.finish();
+
+        assertFiles(transactionLogs.getDataFolder(), Collections.<String>emptySet());
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+        assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+
+        sstableNew.selfRef().release();
+    }
+
+    @Test
+    public void testCommitSameDesc() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstableOld1 = sstable(cfs, 0, 128);
+        SSTableReader sstableOld2 = sstable(cfs, 0, 256);
+        SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        transactionLogs.trackNew(sstableNew);
+
+        sstableOld1.setReplaced();
+
+        TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld2);
+        assertNotNull(tidier);
+
+        transactionLogs.finish();
+
+        sstableOld2.markObsolete(tidier);
+
+        sstableOld1.selfRef().release();
+        sstableOld2.selfRef().release();
+
+        TransactionLogs.waitForDeletions();
+
+        assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+        assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+
+        sstableNew.selfRef().release();
+    }
+
+    @Test
+    public void testCommitOnlyNew() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstable = sstable(cfs, 0, 128);
+
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        transactionLogs.trackNew(sstable);
+        transactionLogs.finish();
+
+        assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+        assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+
+        sstable.selfRef().release();
+    }
+
+    @Test
+    public void testCommitOnlyOld() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstable = sstable(cfs, 0, 128);
+
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
+        assertNotNull(tidier);
+
+        transactionLogs.finish();
+        sstable.markObsolete(tidier);
+        sstable.selfRef().release();
+
+        TransactionLogs.waitForDeletions();
+
+        assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+        assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+    }
+
+    @Test
+    public void testAbortOnlyNew() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstable = sstable(cfs, 0, 128);
+
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        transactionLogs.trackNew(sstable);
+        transactionLogs.abort();
+
+        sstable.selfRef().release();
+
+        assertFiles(transactionLogs.getDataFolder(), new HashSet<>());
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+        assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+    }
+
+    @Test
+    public void testAbortOnlyOld() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstable = sstable(cfs, 0, 128);
+
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstable);
+        assertNotNull(tidier);
+
+        tidier.abort();
+        transactionLogs.abort();
+
+        sstable.selfRef().release();
+
+        assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstable.getAllFilePaths()));
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+        assertEquals(0, TransactionLogs.getLogFiles(cfs.metadata).size());
+    }
+
+    private File copyToTmpFile(File file) throws IOException
+    {
+        File ret = File.createTempFile(file.getName(), ".tmp");
+        ret.deleteOnExit();
+        Files.copy(file.toPath(), ret.toPath(), StandardCopyOption.REPLACE_EXISTING);
+        return ret;
+    }
+
+    @Test
+    public void testRemoveUnfinishedLeftovers_newLogFound() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstableOld = sstable(cfs, 0, 128);
+        SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+        // simulate tracking sstables with a failed transaction (new log file NOT deleted)
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        transactionLogs.trackNew(sstableNew);
+        TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
+
+        File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
+        File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
+
+        Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
+        for (String p : sstableNew.getAllFilePaths())
+            tmpFiles.add(new File(p));
+
+        sstableNew.selfRef().release();
+        sstableOld.selfRef().release();
+
+        Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableNew.descriptor.directory));
+
+        // normally called at startup
+        TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
+
+        // sstable should not have been removed because the new log was found
+        Directories directories = new Directories(cfs.metadata);
+        Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
+        assertEquals(1, sstables.size());
+
+        assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableOld.getAllFilePaths()));
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+
+        tidier.run();
+
+        // copy old transaction files contents back or transactionlogs will throw assertions
+        Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
+        Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
+
+        transactionLogs.close();
+    }
+
+    @Test
+    public void testRemoveUnfinishedLeftovers_oldLogFound() throws Throwable
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstableOld = sstable(cfs, 0, 128);
+        SSTableReader sstableNew = sstable(cfs, 1, 128);
+
+        // simulate tracking sstables with a committed transaction (new log file deleted)
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.COMPACTION, cfs.metadata);
+        assertNotNull(transactionLogs);
+
+        transactionLogs.trackNew(sstableNew);
+        TransactionLogs.SSTableTidier tidier = transactionLogs.obsoleted(sstableOld);
+
+        File tmpNewLog = copyToTmpFile(transactionLogs.getData().newLog().file);
+        File tmpOldLog = copyToTmpFile(transactionLogs.getData().oldLog().file);
+
+        transactionLogs.getData().newLog().delete(false);
+
+        Set<File> tmpFiles = new HashSet<>(TransactionLogs.getLogFiles(cfs.metadata));
+        for (String p : sstableOld.getAllFilePaths())
+            tmpFiles.add(new File(p));
+
+        sstableNew.selfRef().release();
+        sstableOld.selfRef().release();
+
+        Assert.assertEquals(tmpFiles, TransactionLogs.getTemporaryFiles(cfs.metadata, sstableOld.descriptor.directory));
+
+        // normally called at startup
+        TransactionLogs.removeUnfinishedLeftovers(cfs.metadata);
+
+        // sstable should have been removed because there was no new log.
+        Directories directories = new Directories(cfs.metadata);
+        Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
+        assertEquals(1, sstables.size());
+
+        assertFiles(transactionLogs.getDataFolder(), new HashSet<>(sstableNew.getAllFilePaths()));
+        assertFiles(transactionLogs.getLogsFolder(), Collections.<String>emptySet());
+
+        tidier.run();
+
+        // copy old transaction files contents back or transactionlogs will throw assertions
+        Files.move(tmpNewLog.toPath(), transactionLogs.getData().newLog().file.toPath());
+        Files.move(tmpOldLog.toPath(), transactionLogs.getData().oldLog().file.toPath());
+
+        transactionLogs.close();
+    }
+
+    @Test
+    public void testGetTemporaryFiles() throws IOException
+    {
+        ColumnFamilyStore cfs = MockSchema.newCFS(KEYSPACE);
+        SSTableReader sstable1 = sstable(cfs, 0, 128);
+
+        File dataFolder = sstable1.descriptor.directory;
+
+        Set<File> tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
+        assertNotNull(tmpFiles);
+        assertEquals(0, tmpFiles.size());
+
+        TransactionLogs transactionLogs = new TransactionLogs(OperationType.WRITE, cfs.metadata);
+        Directories directories = new Directories(cfs.metadata);
+
+        File[] beforeSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
+
+        SSTableReader sstable2 = sstable(cfs, 1, 128);
+        transactionLogs.trackNew(sstable2);
+
+        Map<Descriptor, Set<Component>> sstables = directories.sstableLister().list();
+        assertEquals(2, sstables.size());
+
+        File[] afterSecondSSTable = dataFolder.listFiles(pathname -> !pathname.isDirectory());
+        int numNewFiles = afterSecondSSTable.length - beforeSecondSSTable.length;
+        assertTrue(numNewFiles == sstable2.getAllFilePaths().size());
+
+        tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
+        assertNotNull(tmpFiles);
+        assertEquals(numNewFiles + 2, tmpFiles.size()); //the extra files are the transaction log files
+
+        File ssTable2DataFile = new File(sstable2.descriptor.filenameFor(Component.DATA));
+        File ssTable2IndexFile = new File(sstable2.descriptor.filenameFor(Component.PRIMARY_INDEX));
+
+        assertTrue(tmpFiles.contains(ssTable2DataFile));
+        assertTrue(tmpFiles.contains(ssTable2IndexFile));
+
+        List<File> files = directories.sstableLister().listFiles();
+        List<File> filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
+        assertNotNull(files);
+        assertNotNull(filesNoTmp);
+
+        assertTrue(files.contains(ssTable2DataFile));
+        assertTrue(files.contains(ssTable2IndexFile));
+
+        assertFalse(filesNoTmp.contains(ssTable2DataFile));
+        assertFalse(filesNoTmp.contains(ssTable2IndexFile));
+
+        transactionLogs.finish();
+
+        //Now it should be empty since the transaction has finished
+        tmpFiles = TransactionLogs.getTemporaryFiles(cfs.metadata, dataFolder);
+        assertNotNull(tmpFiles);
+        assertEquals(0, tmpFiles.size());
+
+        filesNoTmp = directories.sstableLister().skipTemporary(true).listFiles();
+        assertNotNull(filesNoTmp);
+        assertTrue(filesNoTmp.contains(ssTable2DataFile));
+        assertTrue(filesNoTmp.contains(ssTable2IndexFile));
+
+        sstable1.selfRef().release();
+        sstable2.selfRef().release();
+    }
+
+    public static SSTableReader sstable(ColumnFamilyStore cfs, int generation, int size) throws IOException
+    {
+        Directories dir = new Directories(cfs.metadata);
+        Descriptor descriptor = new Descriptor(dir.getDirectoryForNewSSTables(), cfs.keyspace.getName(), cfs.getColumnFamilyName(), generation);
+        Set<Component> components = ImmutableSet.of(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.TOC);
+        for (Component component : components)
+        {
+            File file = new File(descriptor.filenameFor(component));
+            file.createNewFile();
+            try (RandomAccessFile raf = new RandomAccessFile(file, "rw"))
+            {
+                raf.setLength(size);
+            }
+        }
+
+        SegmentedFile dFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.DATA))), 0);
+        SegmentedFile iFile = new BufferedSegmentedFile(new ChannelProxy(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))), 0);
+
+        SerializationHeader header = SerializationHeader.make(cfs.metadata, Collections.EMPTY_LIST);
+        StatsMetadata metadata = (StatsMetadata) new MetadataCollector(cfs.metadata.comparator)
+                                                 .finalizeMetadata(Murmur3Partitioner.instance.getClass().getCanonicalName(), 0.01f, -1, header)
+                                                 .get(MetadataType.STATS);
+        SSTableReader reader = SSTableReader.internalOpen(descriptor,
+                                                          components,
+                                                          cfs.metadata,
+                                                          Murmur3Partitioner.instance,
+                                                          dFile,
+                                                          iFile,
+                                                          MockSchema.indexSummary.sharedCopy(),
+                                                          new AlwaysPresentFilter(),
+                                                          1L,
+                                                          metadata,
+                                                          SSTableReader.OpenReason.NORMAL,
+                                                          header);
+        reader.first = reader.last = MockSchema.readerBounds(generation);
+        return reader;
+    }
+
+    private static void assertFiles(String dirPath, Set<String> expectedFiles)
+    {
+        File dir = new File(dirPath);
+        for (File file : dir.listFiles())
+        {
+            if (file.isDirectory())
+                continue;
+
+            String filePath = file.getPath();
+            assertTrue(filePath, expectedFiles.contains(filePath));
+            expectedFiles.remove(filePath);
+        }
+
+        assertTrue(expectedFiles.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
index 29875d5..08de62f 100644
--- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java
@@ -61,19 +61,19 @@ public class BigTableWriterTest extends AbstractTransactionalTest
     {
         final File file;
         final Descriptor descriptor;
-        final SSTableWriter writer;
+        final SSTableTxnWriter writer;
 
         private TestableBTW() throws IOException
         {
-            this(cfs.getTempSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
+            this(cfs.getSSTablePath(cfs.directories.getDirectoryForNewSSTables()));
         }
 
         private TestableBTW(String file) throws IOException
         {
-            this(file, SSTableWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
+            this(file, SSTableTxnWriter.create(file, 0, 0, new SerializationHeader(cfs.metadata, cfs.metadata.partitionColumns(), EncodingStats.NO_STATS)));
         }
 
-        private TestableBTW(String file, SSTableWriter sw) throws IOException
+        private TestableBTW(String file, SSTableTxnWriter sw) throws IOException
         {
             super(sw);
             this.file = new File(file);
@@ -91,22 +91,19 @@ public class BigTableWriterTest extends AbstractTransactionalTest
 
         protected void assertInProgress() throws Exception
         {
-            assertExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX);
-            assertNotExists(Descriptor.Type.TEMP, Component.FILTER, Component.SUMMARY);
-            assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+            assertExists(Component.DATA, Component.PRIMARY_INDEX);
+            assertNotExists(Component.FILTER, Component.SUMMARY);
             Assert.assertTrue(file.length() > 0);
         }
 
         protected void assertPrepared() throws Exception
         {
-            assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
-            assertExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+            assertExists(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
         }
 
         protected void assertAborted() throws Exception
         {
-            assertNotExists(Descriptor.Type.TEMP, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
-            assertNotExists(Descriptor.Type.FINAL, Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
+            assertNotExists(Component.DATA, Component.PRIMARY_INDEX, Component.FILTER, Component.SUMMARY);
             Assert.assertFalse(file.exists());
         }
 
@@ -115,15 +112,15 @@ public class BigTableWriterTest extends AbstractTransactionalTest
             assertPrepared();
         }
 
-        private void assertExists(Descriptor.Type type, Component ... components)
+        private void assertExists(Component ... components)
         {
             for (Component component : components)
-                Assert.assertTrue(new File(descriptor.asType(type).filenameFor(component)).exists());
+                Assert.assertTrue(new File(descriptor.filenameFor(component)).exists());
         }
-        private void assertNotExists(Descriptor.Type type, Component ... components)
+        private void assertNotExists(Component ... components)
         {
             for (Component component : components)
-                Assert.assertFalse(type.toString() + " " + component.toString(), new File(descriptor.asType(type).filenameFor(component)).exists());
+                Assert.assertFalse(component.toString(), new File(descriptor.filenameFor(component)).exists());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
index dab88c8..9a558f1 100644
--- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterClientTest.java
@@ -28,10 +28,13 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.cassandra.config.Config;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.io.util.FileUtils;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class CQLSSTableWriterClientTest
 {
@@ -97,5 +100,11 @@ public class CQLSSTableWriterClientTest
         File[] dataFiles = this.testDirectory.listFiles(filter);
         assertEquals(2, dataFiles.length);
 
+        File transactionsFolder = Directories.getTransactionsDirectory(testDirectory);
+        assertTrue(transactionsFolder.exists());
+
+        File[] opFiles = transactionsFolder.listFiles();
+        assertEquals(0, opFiles.length);
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b09e60f7/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
index ceb0f9c..19eca40 100644
--- a/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/DescriptorTest.java
@@ -77,24 +77,19 @@ public class DescriptorTest
     private void testFromFilenameFor(File dir)
     {
         // normal
-        checkFromFilename(new Descriptor(dir, ksname, cfname, 1, Descriptor.Type.FINAL), false);
+        checkFromFilename(new Descriptor(dir, ksname, cfname, 1), false);
         // skip component (for streaming lock file)
-        checkFromFilename(new Descriptor(dir, ksname, cfname, 2, Descriptor.Type.FINAL), true);
-        // tmp
-        checkFromFilename(new Descriptor(dir, ksname, cfname, 3, Descriptor.Type.TEMP), false);
+        checkFromFilename(new Descriptor(dir, ksname, cfname, 2), true);
+
         // secondary index
         String idxName = "myidx";
         File idxDir = new File(dir.getAbsolutePath() + File.separator + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName);
-        checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4, Descriptor.Type.FINAL), false);
-        // secondary index tmp
-        checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 5, Descriptor.Type.TEMP), false);
+        checkFromFilename(new Descriptor(idxDir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 4), false);
 
         // legacy version
-        checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY), false);
-        // legacy tmp
-        checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 2, Descriptor.Type.TEMP, SSTableFormat.Type.LEGACY), false);
+        checkFromFilename(new Descriptor("ja", dir, ksname, cfname, 1, SSTableFormat.Type.LEGACY), false);
         // legacy secondary index
-        checkFromFilename(new Descriptor("ja", dir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 3, Descriptor.Type.FINAL, SSTableFormat.Type.LEGACY), false);
+        checkFromFilename(new Descriptor("ja", dir, ksname, cfname + Directories.SECONDARY_INDEX_NAME_SEPARATOR + idxName, 3, SSTableFormat.Type.LEGACY), false);
     }
 
     private void checkFromFilename(Descriptor original, boolean skipComponent)
@@ -109,7 +104,6 @@ public class DescriptorTest
         assertEquals(original.cfname, desc.cfname);
         assertEquals(original.version, desc.version);
         assertEquals(original.generation, desc.generation);
-        assertEquals(original.type, desc.type);
 
         if (skipComponent)
         {