You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/30 21:04:10 UTC

[1/3] git commit: Delete unfinished compaction sstables incrementally

Updated Branches:
  refs/heads/cassandra-2.0 c5ca8de4d -> 4ed223407
  refs/heads/trunk 76ee9a155 -> 49efc13cd


Delete unfinished compaction sstables incrementally

patch by thobbs; reviewed by yukim for CASSANDRA-6086


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed22340
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed22340
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed22340

Branch: refs/heads/cassandra-2.0
Commit: 4ed2234078c4d302c256332252a8ddd6ae345484
Parents: c5ca8de
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 13:48:09 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 13:48:09 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 39 +++++++----
 .../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
 .../db/compaction/LeveledManifest.java          | 14 ++++
 .../cassandra/service/CassandraDaemon.java      | 43 +++++-------
 .../cassandra/db/ColumnFamilyStoreTest.java     | 74 ++++++++++++++++++--
 .../db/compaction/CompactionsTest.java          |  4 +-
 7 files changed, 151 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0396006..958369a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.5
+* Delete unfinished compaction incrementally (CASSANDRA-6086)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cbd9d2e..4d7d6f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -483,27 +483,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
      * sstables from any given ancestor).
      */
-    public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations)
+    public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Map<Integer, UUID> unfinishedCompactions)
     {
         Directories directories = Directories.create(keyspace, columnfamily);
 
-        // sanity-check unfinishedGenerations
-        Set<Integer> allGenerations = new HashSet<Integer>();
+        Set<Integer> allGenerations = new HashSet<>();
         for (Descriptor desc : directories.sstableLister().list().keySet())
             allGenerations.add(desc.generation);
+
+        // sanity-check unfinishedCompactions
+        Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
         if (!allGenerations.containsAll(unfinishedGenerations))
         {
-            throw new IllegalStateException("Unfinished compactions reference missing sstables."
-                                            + " This should never happen since compactions are marked finished before we start removing the old sstables.");
+            HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
+            missingGenerations.removeAll(allGenerations);
+            logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
+                         keyspace, columnfamily, missingGenerations);
         }
 
         // remove new sstables from compactions that didn't complete, and compute
         // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<Integer>();
+        Set<Integer> completedAncestors = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
 
             Set<Integer> ancestors;
             try
@@ -515,9 +518,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 throw new FSReadError(e, desc.filenameFor(Component.STATS));
             }
 
-            if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors))
+            if (!ancestors.isEmpty()
+                && unfinishedGenerations.containsAll(ancestors)
+                && allGenerations.containsAll(ancestors))
             {
-                SSTable.delete(desc, components);
+                // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
+                UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
+                assert compactionTaskID != null;
+                logger.debug("Going to delete unfinished compaction product {}", desc);
+                SSTable.delete(desc, sstableFiles.getValue());
+                SystemKeyspace.finishCompaction(compactionTaskID);
             }
             else
             {
@@ -529,10 +539,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
-
             if (completedAncestors.contains(desc.generation))
-                SSTable.delete(desc, components);
+            {
+                // if any of the ancestors were participating in a compaction, finish that compaction
+                logger.debug("Going to delete leftover compaction ancestor {}", desc);
+                SSTable.delete(desc, sstableFiles.getValue());
+                UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
+                if (compactionTaskID != null)
+                    SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9093ec0..910f025 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,6 +188,11 @@ public class SystemKeyspace
         return compactionId;
     }
 
+    /**
+     * Deletes the entry for this compaction from the set of compactions in progress.  The compaction does not need
+     * to complete successfully for this to be called.
+     * @param taskId what was returned from {@code startCompaction}
+     */
     public static void finishCompaction(UUID taskId)
     {
         assert taskId != null;
@@ -198,21 +203,31 @@ public class SystemKeyspace
     }
 
     /**
-     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+     * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
+     * task ID of the compaction they were participating in.
      */
-    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+    public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
     {
         String req = "SELECT * FROM system.%s";
         UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
 
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
         for (UntypedResultSet.Row row : resultSet)
         {
             String keyspace = row.getString("keyspace_name");
             String columnfamily = row.getString("columnfamily_name");
             Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+            UUID taskID = row.getUUID("id");
+
+            Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
+            Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
+            if (generationToTaskID == null)
+                generationToTaskID = new HashMap<>(inputs.size());
+
+            for (Integer generation : inputs)
+                generationToTaskID.put(generation, taskID);
 
-            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
+            unfinishedCompactions.put(kscf, generationToTaskID);
         }
         return unfinishedCompactions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 92cd887..4347ad5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -33,6 +33,8 @@ import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Bounds;
@@ -619,4 +621,16 @@ public class LeveledManifest
             this.maxSSTableBytes = maxSSTableBytes;
         }
     }
+
+    public static void maybeMigrateManifests() throws IOException
+    {
+        for (String keyspaceName : Schema.instance.getKeyspaces())
+        {
+            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+            {
+                if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
+                    LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0cebc11..d36b0db 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,6 +24,8 @@ import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -32,7 +34,6 @@ import javax.management.StandardMBean;
 import com.addthis.metrics.reporter.config.ReporterConfig;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
 import org.apache.log4j.PropertyConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ import org.apache.cassandra.db.MeteredFlusher;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LegacyLeveledManifest;
+import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.util.FileUtils;
@@ -236,35 +237,29 @@ public class CassandraDaemon
         // load keyspace descriptions.
         DatabaseDescriptor.loadSchemas();
 
-        // clean up debris in the rest of the keyspaces
-        for (String keyspaceName : Schema.instance.getKeyspaces())
+        try
         {
-            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
-            {
-                if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
-                {
-                    try
-                    {
-                        LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
-                    }
-                    catch (IOException e)
-                    {
-                        logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
-                        System.exit(100);
-                    }
-                }
-
-                ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
-            }
+            LeveledManifest.maybeMigrateManifests();
         }
+        catch(IOException e)
+        {
+            logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
+            System.exit(100);
+        }
+
         // clean up compaction leftovers
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
         for (Pair<String, String> kscf : unfinishedCompactions.keySet())
-        {
             ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
-        }
         SystemKeyspace.discardCompactionsInProgress();
 
+        // clean up debris in the rest of the keyspaces
+        for (String keyspaceName : Schema.instance.getKeyspaces())
+        {
+            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+                ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
+        }
+
         // initialize keyspaces
         for (String keyspaceName : Schema.instance.getKeyspaces())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 4e6c87f..65b1708 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import static org.junit.Assert.*;
@@ -1581,7 +1582,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
     }
 
     @Test
-    public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+    public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
     {
         String ks = "Keyspace1";
         String cf = "Standard3"; // should be empty
@@ -1597,14 +1598,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         writer.close();
 
         Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-        assert sstables.size() == 1;
+        assertEquals(1, sstables.size());
 
         Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
         final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
 
         // simulate incomplete compaction
         writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
-                                                             cfmeta, StorageService.getPartitioner())
+                                         cfmeta, StorageService.getPartitioner())
         {
             protected SSTableWriter getWriter()
             {
@@ -1623,12 +1624,75 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // should have 2 sstables now
         sstables = dir.sstableLister().list();
-        assert sstables.size() == 2;
+        assertEquals(2, sstables.size());
+
+        UUID compactionTaskID = SystemKeyspace.startCompaction(
+                Keyspace.open(ks).getColumnFamilyStore(cf),
+                Collections.singleton(SSTableReader.open(sstable1.descriptor)));
 
-        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
+        Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
+        unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
+        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompaction);
 
         // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
         sstables = dir.sstableLister().list();
+        assertEquals(1, sstables.size());
+        assertTrue(sstables.containsKey(sstable1.descriptor));
+
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+        assertTrue(unfinished.isEmpty());
+    }
+
+    /**
+     * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+     */
+    @Test
+    public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+    {
+        final String ks = "Keyspace1";
+        final String cf = "Standard4"; // should be empty
+
+        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+        Directories dir = Directories.create(ks, cf);
+        ByteBuffer key = bytes("key");
+
+        // Write SSTable generation 3 that has ancestors 1 and 2
+        final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+                                                cfmeta, StorageService.getPartitioner())
+        {
+            protected SSTableWriter getWriter()
+            {
+                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+                for (int ancestor : ancestors)
+                    collector.addAncestor(ancestor);
+                String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+                return new SSTableWriter(file,
+                                         0,
+                                         metadata,
+                                         StorageService.getPartitioner(),
+                                         collector);
+            }
+        };
+        writer.newRow(key);
+        writer.addColumn(bytes("col"), bytes("val"), 1);
+        writer.close();
+
+        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+        assert sstables.size() == 1;
+
+        Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+        final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+        // simulate we don't have generation in compaction_history
+        Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+        UUID compactionTaskID = UUID.randomUUID();
+        for (Integer ancestor : ancestors)
+            unfinishedCompactions.put(ancestor, compactionTaskID);
+        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+
+        // SSTable should not be deleted
+        sstables = dir.sstableLister().list();
         assert sstables.size() == 1;
         assert sstables.containsKey(sstable1.descriptor);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a338290..7b91bed 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -289,8 +289,8 @@ public class CompactionsTest extends SchemaLoader
             }
         }));
         UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
-        SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
-        Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf));
+        Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
+        Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
         assert unfinishedCompactions.containsAll(generations);
 
         SystemKeyspace.finishCompaction(taskId);


[2/3] git commit: Delete unfinished compaction sstables incrementally

Posted by yu...@apache.org.
Delete unfinished compaction sstables incrementally

patch by thobbs; reviewed by yukim for CASSANDRA-6086


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed22340
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed22340
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed22340

Branch: refs/heads/trunk
Commit: 4ed2234078c4d302c256332252a8ddd6ae345484
Parents: c5ca8de
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 13:48:09 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 13:48:09 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 39 +++++++----
 .../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
 .../db/compaction/LeveledManifest.java          | 14 ++++
 .../cassandra/service/CassandraDaemon.java      | 43 +++++-------
 .../cassandra/db/ColumnFamilyStoreTest.java     | 74 ++++++++++++++++++--
 .../db/compaction/CompactionsTest.java          |  4 +-
 7 files changed, 151 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0396006..958369a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.5
+* Delete unfinished compaction incrementally (CASSANDRA-6086)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cbd9d2e..4d7d6f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -483,27 +483,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
      * sstables from any given ancestor).
      */
-    public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations)
+    public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Map<Integer, UUID> unfinishedCompactions)
     {
         Directories directories = Directories.create(keyspace, columnfamily);
 
-        // sanity-check unfinishedGenerations
-        Set<Integer> allGenerations = new HashSet<Integer>();
+        Set<Integer> allGenerations = new HashSet<>();
         for (Descriptor desc : directories.sstableLister().list().keySet())
             allGenerations.add(desc.generation);
+
+        // sanity-check unfinishedCompactions
+        Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
         if (!allGenerations.containsAll(unfinishedGenerations))
         {
-            throw new IllegalStateException("Unfinished compactions reference missing sstables."
-                                            + " This should never happen since compactions are marked finished before we start removing the old sstables.");
+            HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
+            missingGenerations.removeAll(allGenerations);
+            logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
+                         keyspace, columnfamily, missingGenerations);
         }
 
         // remove new sstables from compactions that didn't complete, and compute
         // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<Integer>();
+        Set<Integer> completedAncestors = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
 
             Set<Integer> ancestors;
             try
@@ -515,9 +518,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 throw new FSReadError(e, desc.filenameFor(Component.STATS));
             }
 
-            if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors))
+            if (!ancestors.isEmpty()
+                && unfinishedGenerations.containsAll(ancestors)
+                && allGenerations.containsAll(ancestors))
             {
-                SSTable.delete(desc, components);
+                // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
+                UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
+                assert compactionTaskID != null;
+                logger.debug("Going to delete unfinished compaction product {}", desc);
+                SSTable.delete(desc, sstableFiles.getValue());
+                SystemKeyspace.finishCompaction(compactionTaskID);
             }
             else
             {
@@ -529,10 +539,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
-
             if (completedAncestors.contains(desc.generation))
-                SSTable.delete(desc, components);
+            {
+                // if any of the ancestors were participating in a compaction, finish that compaction
+                logger.debug("Going to delete leftover compaction ancestor {}", desc);
+                SSTable.delete(desc, sstableFiles.getValue());
+                UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
+                if (compactionTaskID != null)
+                    SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9093ec0..910f025 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,6 +188,11 @@ public class SystemKeyspace
         return compactionId;
     }
 
+    /**
+     * Deletes the entry for this compaction from the set of compactions in progress.  The compaction does not need
+     * to complete successfully for this to be called.
+     * @param taskId what was returned from {@code startCompaction}
+     */
     public static void finishCompaction(UUID taskId)
     {
         assert taskId != null;
@@ -198,21 +203,31 @@ public class SystemKeyspace
     }
 
     /**
-     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+     * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
+     * task ID of the compaction they were participating in.
      */
-    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+    public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
     {
         String req = "SELECT * FROM system.%s";
         UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
 
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
         for (UntypedResultSet.Row row : resultSet)
         {
             String keyspace = row.getString("keyspace_name");
             String columnfamily = row.getString("columnfamily_name");
             Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+            UUID taskID = row.getUUID("id");
+
+            Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
+            Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
+            if (generationToTaskID == null)
+                generationToTaskID = new HashMap<>(inputs.size());
+
+            for (Integer generation : inputs)
+                generationToTaskID.put(generation, taskID);
 
-            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
+            unfinishedCompactions.put(kscf, generationToTaskID);
         }
         return unfinishedCompactions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 92cd887..4347ad5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -33,6 +33,8 @@ import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Bounds;
@@ -619,4 +621,16 @@ public class LeveledManifest
             this.maxSSTableBytes = maxSSTableBytes;
         }
     }
+
+    public static void maybeMigrateManifests() throws IOException
+    {
+        for (String keyspaceName : Schema.instance.getKeyspaces())
+        {
+            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+            {
+                if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
+                    LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0cebc11..d36b0db 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,6 +24,8 @@ import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -32,7 +34,6 @@ import javax.management.StandardMBean;
 import com.addthis.metrics.reporter.config.ReporterConfig;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
 import org.apache.log4j.PropertyConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ import org.apache.cassandra.db.MeteredFlusher;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LegacyLeveledManifest;
+import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.util.FileUtils;
@@ -236,35 +237,29 @@ public class CassandraDaemon
         // load keyspace descriptions.
         DatabaseDescriptor.loadSchemas();
 
-        // clean up debris in the rest of the keyspaces
-        for (String keyspaceName : Schema.instance.getKeyspaces())
+        try
         {
-            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
-            {
-                if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
-                {
-                    try
-                    {
-                        LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
-                    }
-                    catch (IOException e)
-                    {
-                        logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
-                        System.exit(100);
-                    }
-                }
-
-                ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
-            }
+            LeveledManifest.maybeMigrateManifests();
         }
+        catch(IOException e)
+        {
+            logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
+            System.exit(100);
+        }
+
         // clean up compaction leftovers
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
         for (Pair<String, String> kscf : unfinishedCompactions.keySet())
-        {
             ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
-        }
         SystemKeyspace.discardCompactionsInProgress();
 
+        // clean up debris in the rest of the keyspaces
+        for (String keyspaceName : Schema.instance.getKeyspaces())
+        {
+            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+                ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
+        }
+
         // initialize keyspaces
         for (String keyspaceName : Schema.instance.getKeyspaces())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 4e6c87f..65b1708 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import static org.junit.Assert.*;
@@ -1581,7 +1582,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
     }
 
     @Test
-    public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+    public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
     {
         String ks = "Keyspace1";
         String cf = "Standard3"; // should be empty
@@ -1597,14 +1598,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         writer.close();
 
         Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-        assert sstables.size() == 1;
+        assertEquals(1, sstables.size());
 
         Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
         final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
 
         // simulate incomplete compaction
         writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
-                                                             cfmeta, StorageService.getPartitioner())
+                                         cfmeta, StorageService.getPartitioner())
         {
             protected SSTableWriter getWriter()
             {
@@ -1623,12 +1624,75 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // should have 2 sstables now
         sstables = dir.sstableLister().list();
-        assert sstables.size() == 2;
+        assertEquals(2, sstables.size());
+
+        UUID compactionTaskID = SystemKeyspace.startCompaction(
+                Keyspace.open(ks).getColumnFamilyStore(cf),
+                Collections.singleton(SSTableReader.open(sstable1.descriptor)));
 
-        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
+        Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
+        unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
+        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompaction);
 
         // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
         sstables = dir.sstableLister().list();
+        assertEquals(1, sstables.size());
+        assertTrue(sstables.containsKey(sstable1.descriptor));
+
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+        assertTrue(unfinished.isEmpty());
+    }
+
+    /**
+     * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+     */
+    @Test
+    public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+    {
+        final String ks = "Keyspace1";
+        final String cf = "Standard4"; // should be empty
+
+        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+        Directories dir = Directories.create(ks, cf);
+        ByteBuffer key = bytes("key");
+
+        // Write SSTable generation 3 that has ancestors 1 and 2
+        final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+                                                cfmeta, StorageService.getPartitioner())
+        {
+            protected SSTableWriter getWriter()
+            {
+                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+                for (int ancestor : ancestors)
+                    collector.addAncestor(ancestor);
+                String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+                return new SSTableWriter(file,
+                                         0,
+                                         metadata,
+                                         StorageService.getPartitioner(),
+                                         collector);
+            }
+        };
+        writer.newRow(key);
+        writer.addColumn(bytes("col"), bytes("val"), 1);
+        writer.close();
+
+        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+        assert sstables.size() == 1;
+
+        Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+        final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+        // simulate we don't have generation in compaction_history
+        Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+        UUID compactionTaskID = UUID.randomUUID();
+        for (Integer ancestor : ancestors)
+            unfinishedCompactions.put(ancestor, compactionTaskID);
+        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+
+        // SSTable should not be deleted
+        sstables = dir.sstableLister().list();
         assert sstables.size() == 1;
         assert sstables.containsKey(sstable1.descriptor);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a338290..7b91bed 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -289,8 +289,8 @@ public class CompactionsTest extends SchemaLoader
             }
         }));
         UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
-        SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
-        Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf));
+        Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
+        Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
         assert unfinishedCompactions.containsAll(generations);
 
         SystemKeyspace.finishCompaction(taskId);


[3/3] git commit: Merge branch 'cassandra-2.0' into trunk

Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into trunk

Conflicts:
	src/java/org/apache/cassandra/db/ColumnFamilyStore.java
	src/java/org/apache/cassandra/service/CassandraDaemon.java
	test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/49efc13c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/49efc13c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/49efc13c

Branch: refs/heads/trunk
Commit: 49efc13cd530735ad802769e7f5322f3c79085ef
Parents: 76ee9a1 4ed2234
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 14:03:54 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 14:03:54 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 35 ++++++---
 .../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
 .../cassandra/service/CassandraDaemon.java      | 16 ++---
 .../cassandra/db/ColumnFamilyStoreTest.java     | 74 ++++++++++++++++++--
 .../db/compaction/CompactionsTest.java          |  4 +-
 6 files changed, 124 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0974925,958369a..7af391e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,5 +1,28 @@@
 +2.1
 + * Multithreaded commitlog (CASSANDRA-3578)
 + * allocate fixed index summary memory pool and resample cold index summaries 
 +   to use less memory (CASSANDRA-5519)
 + * Removed multithreaded compaction (CASSANDRA-6142)
 + * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
 + * change logging from log4j to logback (CASSANDRA-5883)
 + * switch to LZ4 compression for internode communication (CASSANDRA-5887)
 + * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
 + * Remove 1.2 network compatibility code (CASSANDRA-5960)
 + * Remove leveled json manifest migration code (CASSANDRA-5996)
 + * Remove CFDefinition (CASSANDRA-6253)
 + * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
 + * User-defined types for CQL3 (CASSANDRA-5590)
 + * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
 + * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
 + * Secondary index support for collections (CASSANDRA-4511)
 + * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
 + * Push composites support in the storage engine (CASSANDRA-5417)
 + * Add snapshot space used to cfstats (CASSANDRA-6231)
 + * Add cardinality estimator for key count estimation (CASSANDRA-5906)
 +
 +
  2.0.5
+ * Delete unfinished compaction incrementally (CASSANDRA-6086)
  Merged from 1.2:
   * fsync compression metadata (CASSANDRA-6531)
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index ffff006,d36b0db..02731d8
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -21,7 -21,11 +21,9 @@@ import java.io.File
  import java.io.IOException;
  import java.lang.management.ManagementFactory;
  import java.net.InetAddress;
 -import java.net.MalformedURLException;
 -import java.net.URL;
  import java.util.Arrays;
+ import java.util.Map;
+ import java.util.UUID;
  import java.util.concurrent.TimeUnit;
  import javax.management.MBeanServer;
  import javax.management.ObjectName;
@@@ -30,7 -34,7 +32,6 @@@ import javax.management.StandardMBean
  import com.addthis.metrics.reporter.config.ReporterConfig;
  
  import com.google.common.collect.Iterables;
- import com.google.common.collect.SetMultimap;
 -import org.apache.log4j.PropertyConfigurator;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
@@@ -199,6 -237,22 +200,12 @@@ public class CassandraDaemo
          // load keyspace descriptions.
          DatabaseDescriptor.loadSchemas();
  
 -        try
 -        {
 -            LeveledManifest.maybeMigrateManifests();
 -        }
 -        catch(IOException e)
 -        {
 -            logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
 -            System.exit(100);
 -        }
 -
+         // clean up compaction leftovers
+         Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+         for (Pair<String, String> kscf : unfinishedCompactions.keySet())
+             ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
+         SystemKeyspace.discardCompactionsInProgress();
+ 
          // clean up debris in the rest of the keyspaces
          for (String keyspaceName : Schema.instance.getKeyspaces())
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 0a081c1,65b1708..0e50cb6
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@@ -53,7 -52,7 +53,8 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.thrift.*;
  import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
  import org.apache.cassandra.utils.WrappedRunnable;
  
  import static org.junit.Assert.*;
@@@ -1613,6 -1636,63 +1620,63 @@@ public class ColumnFamilyStoreTest exte
  
          // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
          sstables = dir.sstableLister().list();
+         assertEquals(1, sstables.size());
+         assertTrue(sstables.containsKey(sstable1.descriptor));
+ 
+         Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+         assertTrue(unfinished.isEmpty());
+     }
+ 
+     /**
+      * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+      */
+     @Test
+     public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+     {
+         final String ks = "Keyspace1";
+         final String cf = "Standard4"; // should be empty
+ 
+         final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+         Directories dir = Directories.create(ks, cf);
+         ByteBuffer key = bytes("key");
+ 
+         // Write SSTable generation 3 that has ancestors 1 and 2
+         final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+         SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+                                                 cfmeta, StorageService.getPartitioner())
+         {
+             protected SSTableWriter getWriter()
+             {
 -                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
++                MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
+                 for (int ancestor : ancestors)
+                     collector.addAncestor(ancestor);
+                 String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+                 return new SSTableWriter(file,
+                                          0,
+                                          metadata,
+                                          StorageService.getPartitioner(),
+                                          collector);
+             }
+         };
+         writer.newRow(key);
+         writer.addColumn(bytes("col"), bytes("val"), 1);
+         writer.close();
+ 
+         Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+         assert sstables.size() == 1;
+ 
+         Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+         final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+ 
+         // simulate we don't have generation in compaction_history
+         Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+         UUID compactionTaskID = UUID.randomUUID();
+         for (Integer ancestor : ancestors)
+             unfinishedCompactions.put(ancestor, compactionTaskID);
+         ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+ 
+         // SSTable should not be deleted
+         sstables = dir.sstableLister().list();
          assert sstables.size() == 1;
          assert sstables.containsKey(sstable1.descriptor);
      }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------