You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/30 21:04:11 UTC
[2/3] git commit: Delete unfinished compaction sstables incrementally
Delete unfinished compaction sstables incrementally
patch by thobbs; reviewed by yukim for CASSANDRA-6086
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed22340
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed22340
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed22340
Branch: refs/heads/trunk
Commit: 4ed2234078c4d302c256332252a8ddd6ae345484
Parents: c5ca8de
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 13:48:09 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 13:48:09 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 39 +++++++----
.../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
.../db/compaction/LeveledManifest.java | 14 ++++
.../cassandra/service/CassandraDaemon.java | 43 +++++-------
.../cassandra/db/ColumnFamilyStoreTest.java | 74 ++++++++++++++++++--
.../db/compaction/CompactionsTest.java | 4 +-
7 files changed, 151 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0396006..958369a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.5
+* Delete unfinished compaction incrementally (CASSANDRA-6086)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cbd9d2e..4d7d6f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -483,27 +483,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
* sstables from any given ancestor).
*/
- public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations)
+ public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Map<Integer, UUID> unfinishedCompactions)
{
Directories directories = Directories.create(keyspace, columnfamily);
- // sanity-check unfinishedGenerations
- Set<Integer> allGenerations = new HashSet<Integer>();
+ Set<Integer> allGenerations = new HashSet<>();
for (Descriptor desc : directories.sstableLister().list().keySet())
allGenerations.add(desc.generation);
+
+ // sanity-check unfinishedCompactions
+ Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
if (!allGenerations.containsAll(unfinishedGenerations))
{
- throw new IllegalStateException("Unfinished compactions reference missing sstables."
- + " This should never happen since compactions are marked finished before we start removing the old sstables.");
+ HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
+ missingGenerations.removeAll(allGenerations);
+ logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
+ keyspace, columnfamily, missingGenerations);
}
// remove new sstables from compactions that didn't complete, and compute
// set of ancestors that shouldn't exist anymore
- Set<Integer> completedAncestors = new HashSet<Integer>();
+ Set<Integer> completedAncestors = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
- Set<Component> components = sstableFiles.getValue();
Set<Integer> ancestors;
try
@@ -515,9 +518,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
throw new FSReadError(e, desc.filenameFor(Component.STATS));
}
- if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors))
+ if (!ancestors.isEmpty()
+ && unfinishedGenerations.containsAll(ancestors)
+ && allGenerations.containsAll(ancestors))
{
- SSTable.delete(desc, components);
+ // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
+ UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
+ assert compactionTaskID != null;
+ logger.debug("Going to delete unfinished compaction product {}", desc);
+ SSTable.delete(desc, sstableFiles.getValue());
+ SystemKeyspace.finishCompaction(compactionTaskID);
}
else
{
@@ -529,10 +539,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
- Set<Component> components = sstableFiles.getValue();
-
if (completedAncestors.contains(desc.generation))
- SSTable.delete(desc, components);
+ {
+ // if any of the ancestors were participating in a compaction, finish that compaction
+ logger.debug("Going to delete leftover compaction ancestor {}", desc);
+ SSTable.delete(desc, sstableFiles.getValue());
+ UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
+ if (compactionTaskID != null)
+ SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9093ec0..910f025 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,6 +188,11 @@ public class SystemKeyspace
return compactionId;
}
+ /**
+ * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need
+ * to complete successfully for this to be called.
+ * @param taskId what was returned from {@code startCompaction}
+ */
public static void finishCompaction(UUID taskId)
{
assert taskId != null;
@@ -198,21 +203,31 @@ public class SystemKeyspace
}
/**
- * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+ * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
+ * task ID of the compaction they were participating in.
*/
- public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+ public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
{
String req = "SELECT * FROM system.%s";
UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
- SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
for (UntypedResultSet.Row row : resultSet)
{
String keyspace = row.getString("keyspace_name");
String columnfamily = row.getString("columnfamily_name");
Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+ UUID taskID = row.getUUID("id");
+
+ Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
+ Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
+ if (generationToTaskID == null)
+ generationToTaskID = new HashMap<>(inputs.size());
+
+ for (Integer generation : inputs)
+ generationToTaskID.put(generation, taskID);
- unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
+ unfinishedCompactions.put(kscf, generationToTaskID);
}
return unfinishedCompactions;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 92cd887..4347ad5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -33,6 +33,8 @@ import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.Bounds;
@@ -619,4 +621,16 @@ public class LeveledManifest
this.maxSSTableBytes = maxSSTableBytes;
}
}
+
+ public static void maybeMigrateManifests() throws IOException
+ {
+ for (String keyspaceName : Schema.instance.getKeyspaces())
+ {
+ for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+ {
+ if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
+ LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0cebc11..d36b0db 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,6 +24,8 @@ import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -32,7 +34,6 @@ import javax.management.StandardMBean;
import com.addthis.metrics.reporter.config.ReporterConfig;
import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ import org.apache.cassandra.db.MeteredFlusher;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LegacyLeveledManifest;
+import org.apache.cassandra.db.compaction.LeveledManifest;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.util.FileUtils;
@@ -236,35 +237,29 @@ public class CassandraDaemon
// load keyspace descriptions.
DatabaseDescriptor.loadSchemas();
- // clean up debris in the rest of the keyspaces
- for (String keyspaceName : Schema.instance.getKeyspaces())
+ try
{
- for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
- {
- if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
- {
- try
- {
- LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
- }
- catch (IOException e)
- {
- logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
- System.exit(100);
- }
- }
-
- ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
- }
+ LeveledManifest.maybeMigrateManifests();
}
+ catch(IOException e)
+ {
+ logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
+ System.exit(100);
+ }
+
// clean up compaction leftovers
- SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
for (Pair<String, String> kscf : unfinishedCompactions.keySet())
- {
ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
- }
SystemKeyspace.discardCompactionsInProgress();
+ // clean up debris in the rest of the keyspaces
+ for (String keyspaceName : Schema.instance.getKeyspaces())
+ {
+ for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+ ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
+ }
+
// initialize keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 4e6c87f..65b1708 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import static org.junit.Assert.*;
@@ -1581,7 +1582,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
}
@Test
- public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+ public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
{
String ks = "Keyspace1";
String cf = "Standard3"; // should be empty
@@ -1597,14 +1598,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
writer.close();
Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
- assert sstables.size() == 1;
+ assertEquals(1, sstables.size());
Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
// simulate incomplete compaction
writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
- cfmeta, StorageService.getPartitioner())
+ cfmeta, StorageService.getPartitioner())
{
protected SSTableWriter getWriter()
{
@@ -1623,12 +1624,75 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// should have 2 sstables now
sstables = dir.sstableLister().list();
- assert sstables.size() == 2;
+ assertEquals(2, sstables.size());
+
+ UUID compactionTaskID = SystemKeyspace.startCompaction(
+ Keyspace.open(ks).getColumnFamilyStore(cf),
+ Collections.singleton(SSTableReader.open(sstable1.descriptor)));
- ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
+ Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
+ unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompaction);
// 2nd sstable should be removed (only 1st sstable exists in set of size 1)
sstables = dir.sstableLister().list();
+ assertEquals(1, sstables.size());
+ assertTrue(sstables.containsKey(sstable1.descriptor));
+
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+ assertTrue(unfinished.isEmpty());
+ }
+
+ /**
+ * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+ */
+ @Test
+ public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+ {
+ final String ks = "Keyspace1";
+ final String cf = "Standard4"; // should be empty
+
+ final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+ Directories dir = Directories.create(ks, cf);
+ ByteBuffer key = bytes("key");
+
+ // Write SSTable generation 3 that has ancestors 1 and 2
+ final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+ cfmeta, StorageService.getPartitioner())
+ {
+ protected SSTableWriter getWriter()
+ {
+ SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+ for (int ancestor : ancestors)
+ collector.addAncestor(ancestor);
+ String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+ return new SSTableWriter(file,
+ 0,
+ metadata,
+ StorageService.getPartitioner(),
+ collector);
+ }
+ };
+ writer.newRow(key);
+ writer.addColumn(bytes("col"), bytes("val"), 1);
+ writer.close();
+
+ Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+ assert sstables.size() == 1;
+
+ Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+ final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+ // simulate we don't have generation in compaction_history
+ Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+ UUID compactionTaskID = UUID.randomUUID();
+ for (Integer ancestor : ancestors)
+ unfinishedCompactions.put(ancestor, compactionTaskID);
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+
+ // SSTable should not be deleted
+ sstables = dir.sstableLister().list();
assert sstables.size() == 1;
assert sstables.containsKey(sstable1.descriptor);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a338290..7b91bed 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -289,8 +289,8 @@ public class CompactionsTest extends SchemaLoader
}
}));
UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
- Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf));
+ Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
+ Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
assert unfinishedCompactions.containsAll(generations);
SystemKeyspace.finishCompaction(taskId);