You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/30 21:04:11 UTC

[2/3] git commit: Delete unfinished compaction sstables incrementally

Delete unfinished compaction sstables incrementally

patch by thobbs; reviewed by yukim for CASSANDRA-6086


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed22340
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed22340
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed22340

Branch: refs/heads/trunk
Commit: 4ed2234078c4d302c256332252a8ddd6ae345484
Parents: c5ca8de
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 13:48:09 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 13:48:09 2013 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 39 +++++++----
 .../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
 .../db/compaction/LeveledManifest.java          | 14 ++++
 .../cassandra/service/CassandraDaemon.java      | 43 +++++-------
 .../cassandra/db/ColumnFamilyStoreTest.java     | 74 ++++++++++++++++++--
 .../db/compaction/CompactionsTest.java          |  4 +-
 7 files changed, 151 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0396006..958369a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0.5
+* Delete unfinished compaction incrementally (CASSANDRA-6086)
 Merged from 1.2:
  * fsync compression metadata (CASSANDRA-6531)
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cbd9d2e..4d7d6f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -483,27 +483,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
      * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
      * sstables from any given ancestor).
      */
-    public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations)
+    public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Map<Integer, UUID> unfinishedCompactions)
     {
         Directories directories = Directories.create(keyspace, columnfamily);
 
-        // sanity-check unfinishedGenerations
-        Set<Integer> allGenerations = new HashSet<Integer>();
+        Set<Integer> allGenerations = new HashSet<>();
         for (Descriptor desc : directories.sstableLister().list().keySet())
             allGenerations.add(desc.generation);
+
+        // sanity-check unfinishedCompactions
+        Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
         if (!allGenerations.containsAll(unfinishedGenerations))
         {
-            throw new IllegalStateException("Unfinished compactions reference missing sstables."
-                                            + " This should never happen since compactions are marked finished before we start removing the old sstables.");
+            HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
+            missingGenerations.removeAll(allGenerations);
+            logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
+                         keyspace, columnfamily, missingGenerations);
         }
 
         // remove new sstables from compactions that didn't complete, and compute
         // set of ancestors that shouldn't exist anymore
-        Set<Integer> completedAncestors = new HashSet<Integer>();
+        Set<Integer> completedAncestors = new HashSet<>();
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
 
             Set<Integer> ancestors;
             try
@@ -515,9 +518,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
                 throw new FSReadError(e, desc.filenameFor(Component.STATS));
             }
 
-            if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors))
+            if (!ancestors.isEmpty()
+                && unfinishedGenerations.containsAll(ancestors)
+                && allGenerations.containsAll(ancestors))
             {
-                SSTable.delete(desc, components);
+                // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
+                UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
+                assert compactionTaskID != null;
+                logger.debug("Going to delete unfinished compaction product {}", desc);
+                SSTable.delete(desc, sstableFiles.getValue());
+                SystemKeyspace.finishCompaction(compactionTaskID);
             }
             else
             {
@@ -529,10 +539,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
         for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
         {
             Descriptor desc = sstableFiles.getKey();
-            Set<Component> components = sstableFiles.getValue();
-
             if (completedAncestors.contains(desc.generation))
-                SSTable.delete(desc, components);
+            {
+                // if any of the ancestors were participating in a compaction, finish that compaction
+                logger.debug("Going to delete leftover compaction ancestor {}", desc);
+                SSTable.delete(desc, sstableFiles.getValue());
+                UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
+                if (compactionTaskID != null)
+                    SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9093ec0..910f025 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,6 +188,11 @@ public class SystemKeyspace
         return compactionId;
     }
 
+    /**
+     * Deletes the entry for this compaction from the set of compactions in progress.  The compaction does not need
+     * to complete successfully for this to be called.
+     * @param taskId what was returned from {@code startCompaction}
+     */
     public static void finishCompaction(UUID taskId)
     {
         assert taskId != null;
@@ -198,21 +203,31 @@ public class SystemKeyspace
     }
 
     /**
-     * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+     * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
+     * task ID of the compaction they were participating in.
      */
-    public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+    public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
     {
         String req = "SELECT * FROM system.%s";
         UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
 
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
         for (UntypedResultSet.Row row : resultSet)
         {
             String keyspace = row.getString("keyspace_name");
             String columnfamily = row.getString("columnfamily_name");
             Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+            UUID taskID = row.getUUID("id");
+
+            Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
+            Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
+            if (generationToTaskID == null)
+                generationToTaskID = new HashMap<>(inputs.size());
+
+            for (Integer generation : inputs)
+                generationToTaskID.put(generation, taskID);
 
-            unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
+            unfinishedCompactions.put(kscf, generationToTaskID);
         }
         return unfinishedCompactions;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 92cd887..4347ad5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -33,6 +33,8 @@ import com.google.common.primitives.Ints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.RowPosition;
 import org.apache.cassandra.dht.Bounds;
@@ -619,4 +621,16 @@ public class LeveledManifest
             this.maxSSTableBytes = maxSSTableBytes;
         }
     }
+
+    public static void maybeMigrateManifests() throws IOException
+    {
+        for (String keyspaceName : Schema.instance.getKeyspaces())
+        {
+            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+            {
+                if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
+                    LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0cebc11..d36b0db 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,6 +24,8 @@ import java.net.InetAddress;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -32,7 +34,6 @@ import javax.management.StandardMBean;
 import com.addthis.metrics.reporter.config.ReporterConfig;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
 import org.apache.log4j.PropertyConfigurator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ import org.apache.cassandra.db.MeteredFlusher;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.db.commitlog.CommitLog;
 import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LegacyLeveledManifest;
+import org.apache.cassandra.db.compaction.LeveledManifest;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.util.FileUtils;
@@ -236,35 +237,29 @@ public class CassandraDaemon
         // load keyspace descriptions.
         DatabaseDescriptor.loadSchemas();
 
-        // clean up debris in the rest of the keyspaces
-        for (String keyspaceName : Schema.instance.getKeyspaces())
+        try
         {
-            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
-            {
-                if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
-                {
-                    try
-                    {
-                        LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
-                    }
-                    catch (IOException e)
-                    {
-                        logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
-                        System.exit(100);
-                    }
-                }
-
-                ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
-            }
+            LeveledManifest.maybeMigrateManifests();
         }
+        catch(IOException e)
+        {
+            logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
+            System.exit(100);
+        }
+
         // clean up compaction leftovers
-        SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
         for (Pair<String, String> kscf : unfinishedCompactions.keySet())
-        {
             ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
-        }
         SystemKeyspace.discardCompactionsInProgress();
 
+        // clean up debris in the rest of the keyspaces
+        for (String keyspaceName : Schema.instance.getKeyspaces())
+        {
+            for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+                ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
+        }
+
         // initialize keyspaces
         for (String keyspaceName : Schema.instance.getKeyspaces())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 4e6c87f..65b1708 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.*;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.thrift.*;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
 import org.apache.cassandra.utils.WrappedRunnable;
 
 import static org.junit.Assert.*;
@@ -1581,7 +1582,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
     }
 
     @Test
-    public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+    public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
     {
         String ks = "Keyspace1";
         String cf = "Standard3"; // should be empty
@@ -1597,14 +1598,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         writer.close();
 
         Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
-        assert sstables.size() == 1;
+        assertEquals(1, sstables.size());
 
         Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
         final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
 
         // simulate incomplete compaction
         writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
-                                                             cfmeta, StorageService.getPartitioner())
+                                         cfmeta, StorageService.getPartitioner())
         {
             protected SSTableWriter getWriter()
             {
@@ -1623,12 +1624,75 @@ public class ColumnFamilyStoreTest extends SchemaLoader
 
         // should have 2 sstables now
         sstables = dir.sstableLister().list();
-        assert sstables.size() == 2;
+        assertEquals(2, sstables.size());
+
+        UUID compactionTaskID = SystemKeyspace.startCompaction(
+                Keyspace.open(ks).getColumnFamilyStore(cf),
+                Collections.singleton(SSTableReader.open(sstable1.descriptor)));
 
-        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
+        Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
+        unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
+        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompaction);
 
         // 2nd sstable should be removed (only 1st sstable exists in set of size 1)
         sstables = dir.sstableLister().list();
+        assertEquals(1, sstables.size());
+        assertTrue(sstables.containsKey(sstable1.descriptor));
+
+        Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+        assertTrue(unfinished.isEmpty());
+    }
+
+    /**
+     * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+     */
+    @Test
+    public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+    {
+        final String ks = "Keyspace1";
+        final String cf = "Standard4"; // should be empty
+
+        final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+        Directories dir = Directories.create(ks, cf);
+        ByteBuffer key = bytes("key");
+
+        // Write SSTable generation 3 that has ancestors 1 and 2
+        final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+        SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+                                                cfmeta, StorageService.getPartitioner())
+        {
+            protected SSTableWriter getWriter()
+            {
+                SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+                for (int ancestor : ancestors)
+                    collector.addAncestor(ancestor);
+                String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+                return new SSTableWriter(file,
+                                         0,
+                                         metadata,
+                                         StorageService.getPartitioner(),
+                                         collector);
+            }
+        };
+        writer.newRow(key);
+        writer.addColumn(bytes("col"), bytes("val"), 1);
+        writer.close();
+
+        Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+        assert sstables.size() == 1;
+
+        Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+        final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+        // simulate we don't have generation in compaction_history
+        Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+        UUID compactionTaskID = UUID.randomUUID();
+        for (Integer ancestor : ancestors)
+            unfinishedCompactions.put(ancestor, compactionTaskID);
+        ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+
+        // SSTable should not be deleted
+        sstables = dir.sstableLister().list();
         assert sstables.size() == 1;
         assert sstables.containsKey(sstable1.descriptor);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a338290..7b91bed 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -289,8 +289,8 @@ public class CompactionsTest extends SchemaLoader
             }
         }));
         UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
-        SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
-        Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf));
+        Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
+        Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
         assert unfinishedCompactions.containsAll(generations);
 
         SystemKeyspace.finishCompaction(taskId);