You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2013/12/30 21:04:10 UTC
[1/3] git commit: Delete unfinished compaction sstables incrementally
Updated Branches:
refs/heads/cassandra-2.0 c5ca8de4d -> 4ed223407
refs/heads/trunk 76ee9a155 -> 49efc13cd
Delete unfinished compaction sstables incrementally
patch by thobbs; reviewed by yukim for CASSANDRA-6086
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed22340
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed22340
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed22340
Branch: refs/heads/cassandra-2.0
Commit: 4ed2234078c4d302c256332252a8ddd6ae345484
Parents: c5ca8de
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 13:48:09 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 13:48:09 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 39 +++++++----
.../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
.../db/compaction/LeveledManifest.java | 14 ++++
.../cassandra/service/CassandraDaemon.java | 43 +++++-------
.../cassandra/db/ColumnFamilyStoreTest.java | 74 ++++++++++++++++++--
.../db/compaction/CompactionsTest.java | 4 +-
7 files changed, 151 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0396006..958369a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.5
+* Delete unfinished compaction incrementally (CASSANDRA-6086)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cbd9d2e..4d7d6f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -483,27 +483,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
* sstables from any given ancestor).
*/
- public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations)
+ public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Map<Integer, UUID> unfinishedCompactions)
{
Directories directories = Directories.create(keyspace, columnfamily);
- // sanity-check unfinishedGenerations
- Set<Integer> allGenerations = new HashSet<Integer>();
+ Set<Integer> allGenerations = new HashSet<>();
for (Descriptor desc : directories.sstableLister().list().keySet())
allGenerations.add(desc.generation);
+
+ // sanity-check unfinishedCompactions
+ Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
if (!allGenerations.containsAll(unfinishedGenerations))
{
- throw new IllegalStateException("Unfinished compactions reference missing sstables."
- + " This should never happen since compactions are marked finished before we start removing the old sstables.");
+ HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
+ missingGenerations.removeAll(allGenerations);
+ logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
+ keyspace, columnfamily, missingGenerations);
}
// remove new sstables from compactions that didn't complete, and compute
// set of ancestors that shouldn't exist anymore
- Set<Integer> completedAncestors = new HashSet<Integer>();
+ Set<Integer> completedAncestors = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
- Set<Component> components = sstableFiles.getValue();
Set<Integer> ancestors;
try
@@ -515,9 +518,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
throw new FSReadError(e, desc.filenameFor(Component.STATS));
}
- if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors))
+ if (!ancestors.isEmpty()
+ && unfinishedGenerations.containsAll(ancestors)
+ && allGenerations.containsAll(ancestors))
{
- SSTable.delete(desc, components);
+ // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
+ UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
+ assert compactionTaskID != null;
+ logger.debug("Going to delete unfinished compaction product {}", desc);
+ SSTable.delete(desc, sstableFiles.getValue());
+ SystemKeyspace.finishCompaction(compactionTaskID);
}
else
{
@@ -529,10 +539,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
- Set<Component> components = sstableFiles.getValue();
-
if (completedAncestors.contains(desc.generation))
- SSTable.delete(desc, components);
+ {
+ // if any of the ancestors were participating in a compaction, finish that compaction
+ logger.debug("Going to delete leftover compaction ancestor {}", desc);
+ SSTable.delete(desc, sstableFiles.getValue());
+ UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
+ if (compactionTaskID != null)
+ SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9093ec0..910f025 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,6 +188,11 @@ public class SystemKeyspace
return compactionId;
}
+ /**
+ * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need
+ * to complete successfully for this to be called.
+ * @param taskId what was returned from {@code startCompaction}
+ */
public static void finishCompaction(UUID taskId)
{
assert taskId != null;
@@ -198,21 +203,31 @@ public class SystemKeyspace
}
/**
- * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+ * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
+ * task ID of the compaction they were participating in.
*/
- public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+ public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
{
String req = "SELECT * FROM system.%s";
UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
- SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
for (UntypedResultSet.Row row : resultSet)
{
String keyspace = row.getString("keyspace_name");
String columnfamily = row.getString("columnfamily_name");
Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+ UUID taskID = row.getUUID("id");
+
+ Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
+ Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
+ if (generationToTaskID == null)
+ generationToTaskID = new HashMap<>(inputs.size());
+
+ for (Integer generation : inputs)
+ generationToTaskID.put(generation, taskID);
- unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
+ unfinishedCompactions.put(kscf, generationToTaskID);
}
return unfinishedCompactions;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 92cd887..4347ad5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -33,6 +33,8 @@ import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.Bounds;
@@ -619,4 +621,16 @@ public class LeveledManifest
this.maxSSTableBytes = maxSSTableBytes;
}
}
+
+ public static void maybeMigrateManifests() throws IOException
+ {
+ for (String keyspaceName : Schema.instance.getKeyspaces())
+ {
+ for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+ {
+ if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
+ LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0cebc11..d36b0db 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,6 +24,8 @@ import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -32,7 +34,6 @@ import javax.management.StandardMBean;
import com.addthis.metrics.reporter.config.ReporterConfig;
import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ import org.apache.cassandra.db.MeteredFlusher;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LegacyLeveledManifest;
+import org.apache.cassandra.db.compaction.LeveledManifest;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.util.FileUtils;
@@ -236,35 +237,29 @@ public class CassandraDaemon
// load keyspace descriptions.
DatabaseDescriptor.loadSchemas();
- // clean up debris in the rest of the keyspaces
- for (String keyspaceName : Schema.instance.getKeyspaces())
+ try
{
- for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
- {
- if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
- {
- try
- {
- LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
- }
- catch (IOException e)
- {
- logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
- System.exit(100);
- }
- }
-
- ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
- }
+ LeveledManifest.maybeMigrateManifests();
}
+ catch(IOException e)
+ {
+ logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
+ System.exit(100);
+ }
+
// clean up compaction leftovers
- SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
for (Pair<String, String> kscf : unfinishedCompactions.keySet())
- {
ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
- }
SystemKeyspace.discardCompactionsInProgress();
+ // clean up debris in the rest of the keyspaces
+ for (String keyspaceName : Schema.instance.getKeyspaces())
+ {
+ for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+ ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
+ }
+
// initialize keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 4e6c87f..65b1708 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import static org.junit.Assert.*;
@@ -1581,7 +1582,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
}
@Test
- public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+ public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
{
String ks = "Keyspace1";
String cf = "Standard3"; // should be empty
@@ -1597,14 +1598,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
writer.close();
Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
- assert sstables.size() == 1;
+ assertEquals(1, sstables.size());
Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
// simulate incomplete compaction
writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
- cfmeta, StorageService.getPartitioner())
+ cfmeta, StorageService.getPartitioner())
{
protected SSTableWriter getWriter()
{
@@ -1623,12 +1624,75 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// should have 2 sstables now
sstables = dir.sstableLister().list();
- assert sstables.size() == 2;
+ assertEquals(2, sstables.size());
+
+ UUID compactionTaskID = SystemKeyspace.startCompaction(
+ Keyspace.open(ks).getColumnFamilyStore(cf),
+ Collections.singleton(SSTableReader.open(sstable1.descriptor)));
- ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
+ Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
+ unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompaction);
// 2nd sstable should be removed (only 1st sstable exists in set of size 1)
sstables = dir.sstableLister().list();
+ assertEquals(1, sstables.size());
+ assertTrue(sstables.containsKey(sstable1.descriptor));
+
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+ assertTrue(unfinished.isEmpty());
+ }
+
+ /**
+ * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+ */
+ @Test
+ public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+ {
+ final String ks = "Keyspace1";
+ final String cf = "Standard4"; // should be empty
+
+ final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+ Directories dir = Directories.create(ks, cf);
+ ByteBuffer key = bytes("key");
+
+ // Write SSTable generation 3 that has ancestors 1 and 2
+ final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+ cfmeta, StorageService.getPartitioner())
+ {
+ protected SSTableWriter getWriter()
+ {
+ SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+ for (int ancestor : ancestors)
+ collector.addAncestor(ancestor);
+ String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+ return new SSTableWriter(file,
+ 0,
+ metadata,
+ StorageService.getPartitioner(),
+ collector);
+ }
+ };
+ writer.newRow(key);
+ writer.addColumn(bytes("col"), bytes("val"), 1);
+ writer.close();
+
+ Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+ assert sstables.size() == 1;
+
+ Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+ final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+ // simulate we don't have generation in compaction_history
+ Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+ UUID compactionTaskID = UUID.randomUUID();
+ for (Integer ancestor : ancestors)
+ unfinishedCompactions.put(ancestor, compactionTaskID);
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+
+ // SSTable should not be deleted
+ sstables = dir.sstableLister().list();
assert sstables.size() == 1;
assert sstables.containsKey(sstable1.descriptor);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a338290..7b91bed 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -289,8 +289,8 @@ public class CompactionsTest extends SchemaLoader
}
}));
UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
- Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf));
+ Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
+ Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
assert unfinishedCompactions.containsAll(generations);
SystemKeyspace.finishCompaction(taskId);
[2/3] git commit: Delete unfinished compaction sstables incrementally
Posted by yu...@apache.org.
Delete unfinished compaction sstables incrementally
patch by thobbs; reviewed by yukim for CASSANDRA-6086
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4ed22340
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4ed22340
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4ed22340
Branch: refs/heads/trunk
Commit: 4ed2234078c4d302c256332252a8ddd6ae345484
Parents: c5ca8de
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 13:48:09 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 13:48:09 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 39 +++++++----
.../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
.../db/compaction/LeveledManifest.java | 14 ++++
.../cassandra/service/CassandraDaemon.java | 43 +++++-------
.../cassandra/db/ColumnFamilyStoreTest.java | 74 ++++++++++++++++++--
.../db/compaction/CompactionsTest.java | 4 +-
7 files changed, 151 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0396006..958369a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.5
+* Delete unfinished compaction incrementally (CASSANDRA-6086)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cbd9d2e..4d7d6f2 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -483,27 +483,30 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
* compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple
* sstables from any given ancestor).
*/
- public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Set<Integer> unfinishedGenerations)
+ public static void removeUnfinishedCompactionLeftovers(String keyspace, String columnfamily, Map<Integer, UUID> unfinishedCompactions)
{
Directories directories = Directories.create(keyspace, columnfamily);
- // sanity-check unfinishedGenerations
- Set<Integer> allGenerations = new HashSet<Integer>();
+ Set<Integer> allGenerations = new HashSet<>();
for (Descriptor desc : directories.sstableLister().list().keySet())
allGenerations.add(desc.generation);
+
+ // sanity-check unfinishedCompactions
+ Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet();
if (!allGenerations.containsAll(unfinishedGenerations))
{
- throw new IllegalStateException("Unfinished compactions reference missing sstables."
- + " This should never happen since compactions are marked finished before we start removing the old sstables.");
+ HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations);
+ missingGenerations.removeAll(allGenerations);
+ logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}",
+ keyspace, columnfamily, missingGenerations);
}
// remove new sstables from compactions that didn't complete, and compute
// set of ancestors that shouldn't exist anymore
- Set<Integer> completedAncestors = new HashSet<Integer>();
+ Set<Integer> completedAncestors = new HashSet<>();
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
- Set<Component> components = sstableFiles.getValue();
Set<Integer> ancestors;
try
@@ -515,9 +518,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
throw new FSReadError(e, desc.filenameFor(Component.STATS));
}
- if (!ancestors.isEmpty() && unfinishedGenerations.containsAll(ancestors))
+ if (!ancestors.isEmpty()
+ && unfinishedGenerations.containsAll(ancestors)
+ && allGenerations.containsAll(ancestors))
{
- SSTable.delete(desc, components);
+ // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one
+ UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next());
+ assert compactionTaskID != null;
+ logger.debug("Going to delete unfinished compaction product {}", desc);
+ SSTable.delete(desc, sstableFiles.getValue());
+ SystemKeyspace.finishCompaction(compactionTaskID);
}
else
{
@@ -529,10 +539,15 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
- Set<Component> components = sstableFiles.getValue();
-
if (completedAncestors.contains(desc.generation))
- SSTable.delete(desc, components);
+ {
+ // if any of the ancestors were participating in a compaction, finish that compaction
+ logger.debug("Going to delete leftover compaction ancestor {}", desc);
+ SSTable.delete(desc, sstableFiles.getValue());
+ UUID compactionTaskID = unfinishedCompactions.get(desc.generation);
+ if (compactionTaskID != null)
+ SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 9093ec0..910f025 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -188,6 +188,11 @@ public class SystemKeyspace
return compactionId;
}
+ /**
+ * Deletes the entry for this compaction from the set of compactions in progress. The compaction does not need
+ * to complete successfully for this to be called.
+ * @param taskId what was returned from {@code startCompaction}
+ */
public static void finishCompaction(UUID taskId)
{
assert taskId != null;
@@ -198,21 +203,31 @@ public class SystemKeyspace
}
/**
- * @return unfinished compactions, grouped by keyspace/columnfamily pair.
+ * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
+ * task ID of the compaction they were participating in.
*/
- public static SetMultimap<Pair<String, String>, Integer> getUnfinishedCompactions()
+ public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
{
String req = "SELECT * FROM system.%s";
UntypedResultSet resultSet = processInternal(String.format(req, COMPACTION_LOG));
- SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = HashMultimap.create();
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
for (UntypedResultSet.Row row : resultSet)
{
String keyspace = row.getString("keyspace_name");
String columnfamily = row.getString("columnfamily_name");
Set<Integer> inputs = row.getSet("inputs", Int32Type.instance);
+ UUID taskID = row.getUUID("id");
+
+ Pair<String, String> kscf = Pair.create(keyspace, columnfamily);
+ Map<Integer, UUID> generationToTaskID = unfinishedCompactions.get(kscf);
+ if (generationToTaskID == null)
+ generationToTaskID = new HashMap<>(inputs.size());
+
+ for (Integer generation : inputs)
+ generationToTaskID.put(generation, taskID);
- unfinishedCompactions.putAll(Pair.create(keyspace, columnfamily), inputs);
+ unfinishedCompactions.put(kscf, generationToTaskID);
}
return unfinishedCompactions;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 92cd887..4347ad5 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -33,6 +33,8 @@ import com.google.common.primitives.Ints;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowPosition;
import org.apache.cassandra.dht.Bounds;
@@ -619,4 +621,16 @@ public class LeveledManifest
this.maxSSTableBytes = maxSSTableBytes;
}
}
+
+ public static void maybeMigrateManifests() throws IOException
+ {
+ for (String keyspaceName : Schema.instance.getKeyspaces())
+ {
+ for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+ {
+ if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
+ LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 0cebc11..d36b0db 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -24,6 +24,8 @@ import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Arrays;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -32,7 +34,6 @@ import javax.management.StandardMBean;
import com.addthis.metrics.reporter.config.ReporterConfig;
import com.google.common.collect.Iterables;
-import com.google.common.collect.SetMultimap;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ import org.apache.cassandra.db.MeteredFlusher;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
-import org.apache.cassandra.db.compaction.LegacyLeveledManifest;
+import org.apache.cassandra.db.compaction.LeveledManifest;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.util.FileUtils;
@@ -236,35 +237,29 @@ public class CassandraDaemon
// load keyspace descriptions.
DatabaseDescriptor.loadSchemas();
- // clean up debris in the rest of the keyspaces
- for (String keyspaceName : Schema.instance.getKeyspaces())
+ try
{
- for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
- {
- if (LegacyLeveledManifest.manifestNeedsMigration(keyspaceName,cfm.cfName))
- {
- try
- {
- LegacyLeveledManifest.migrateManifests(keyspaceName, cfm.cfName);
- }
- catch (IOException e)
- {
- logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
- System.exit(100);
- }
- }
-
- ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
- }
+ LeveledManifest.maybeMigrateManifests();
}
+ catch(IOException e)
+ {
+ logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
+ System.exit(100);
+ }
+
// clean up compaction leftovers
- SetMultimap<Pair<String, String>, Integer> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
for (Pair<String, String> kscf : unfinishedCompactions.keySet())
- {
ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
- }
SystemKeyspace.discardCompactionsInProgress();
+ // clean up debris in the rest of the keyspaces
+ for (String keyspaceName : Schema.instance.getKeyspaces())
+ {
+ for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values())
+ ColumnFamilyStore.scrubDataDirectories(keyspaceName, cfm.cfName);
+ }
+
// initialize keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 4e6c87f..65b1708 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.io.sstable.*;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import static org.junit.Assert.*;
@@ -1581,7 +1582,7 @@ public class ColumnFamilyStoreTest extends SchemaLoader
}
@Test
- public void testRemoveUnifinishedCompactionLeftovers() throws Throwable
+ public void testRemoveUnfinishedCompactionLeftovers() throws Throwable
{
String ks = "Keyspace1";
String cf = "Standard3"; // should be empty
@@ -1597,14 +1598,14 @@ public class ColumnFamilyStoreTest extends SchemaLoader
writer.close();
Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
- assert sstables.size() == 1;
+ assertEquals(1, sstables.size());
Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
// simulate incomplete compaction
writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
- cfmeta, StorageService.getPartitioner())
+ cfmeta, StorageService.getPartitioner())
{
protected SSTableWriter getWriter()
{
@@ -1623,12 +1624,75 @@ public class ColumnFamilyStoreTest extends SchemaLoader
// should have 2 sstables now
sstables = dir.sstableLister().list();
- assert sstables.size() == 2;
+ assertEquals(2, sstables.size());
+
+ UUID compactionTaskID = SystemKeyspace.startCompaction(
+ Keyspace.open(ks).getColumnFamilyStore(cf),
+ Collections.singleton(SSTableReader.open(sstable1.descriptor)));
- ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, Sets.newHashSet(sstable1.descriptor.generation));
+ Map<Integer, UUID> unfinishedCompaction = new HashMap<>();
+ unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID);
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompaction);
// 2nd sstable should be removed (only 1st sstable exists in set of size 1)
sstables = dir.sstableLister().list();
+ assertEquals(1, sstables.size());
+ assertTrue(sstables.containsKey(sstable1.descriptor));
+
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+ assertTrue(unfinished.isEmpty());
+ }
+
+ /**
+ * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+ */
+ @Test
+ public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+ {
+ final String ks = "Keyspace1";
+ final String cf = "Standard4"; // should be empty
+
+ final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+ Directories dir = Directories.create(ks, cf);
+ ByteBuffer key = bytes("key");
+
+ // Write SSTable generation 3 that has ancestors 1 and 2
+ final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+ cfmeta, StorageService.getPartitioner())
+ {
+ protected SSTableWriter getWriter()
+ {
+ SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
+ for (int ancestor : ancestors)
+ collector.addAncestor(ancestor);
+ String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+ return new SSTableWriter(file,
+ 0,
+ metadata,
+ StorageService.getPartitioner(),
+ collector);
+ }
+ };
+ writer.newRow(key);
+ writer.addColumn(bytes("col"), bytes("val"), 1);
+ writer.close();
+
+ Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+ assert sstables.size() == 1;
+
+ Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+ final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+ // simulate we don't have generation in compaction_history
+ Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+ UUID compactionTaskID = UUID.randomUUID();
+ for (Integer ancestor : ancestors)
+ unfinishedCompactions.put(ancestor, compactionTaskID);
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+
+ // SSTable should not be deleted
+ sstables = dir.sstableLister().list();
assert sstables.size() == 1;
assert sstables.containsKey(sstable1.descriptor);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/4ed22340/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
index a338290..7b91bed 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
@@ -289,8 +289,8 @@ public class CompactionsTest extends SchemaLoader
}
}));
UUID taskId = SystemKeyspace.startCompaction(cfs, sstables);
- SetMultimap<Pair<String, String>, Integer> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
- Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf));
+ Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
+ Set<Integer> unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet();
assert unfinishedCompactions.containsAll(generations);
SystemKeyspace.finishCompaction(taskId);
[3/3] git commit: Merge branch 'cassandra-2.0' into trunk
Posted by yu...@apache.org.
Merge branch 'cassandra-2.0' into trunk
Conflicts:
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
src/java/org/apache/cassandra/service/CassandraDaemon.java
test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/49efc13c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/49efc13c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/49efc13c
Branch: refs/heads/trunk
Commit: 49efc13cd530735ad802769e7f5322f3c79085ef
Parents: 76ee9a1 4ed2234
Author: Yuki Morishita <yu...@apache.org>
Authored: Mon Dec 30 14:03:54 2013 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Mon Dec 30 14:03:54 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/db/ColumnFamilyStore.java | 35 ++++++---
.../org/apache/cassandra/db/SystemKeyspace.java | 23 ++++--
.../cassandra/service/CassandraDaemon.java | 16 ++---
.../cassandra/db/ColumnFamilyStoreTest.java | 74 ++++++++++++++++++--
.../db/compaction/CompactionsTest.java | 4 +-
6 files changed, 124 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0974925,958369a..7af391e
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,27 -1,5 +1,28 @@@
+2.1
+ * Multithreaded commitlog (CASSANDRA-3578)
+ * allocate fixed index summary memory pool and resample cold index summaries
+ to use less memory (CASSANDRA-5519)
+ * Removed multithreaded compaction (CASSANDRA-6142)
+ * Parallelize fetching rows for low-cardinality indexes (CASSANDRA-1337)
+ * change logging from log4j to logback (CASSANDRA-5883)
+ * switch to LZ4 compression for internode communication (CASSANDRA-5887)
+ * Stop using Thrift-generated Index* classes internally (CASSANDRA-5971)
+ * Remove 1.2 network compatibility code (CASSANDRA-5960)
+ * Remove leveled json manifest migration code (CASSANDRA-5996)
+ * Remove CFDefinition (CASSANDRA-6253)
+ * Use AtomicIntegerFieldUpdater in RefCountedMemory (CASSANDRA-6278)
+ * User-defined types for CQL3 (CASSANDRA-5590)
+ * Use of o.a.c.metrics in nodetool (CASSANDRA-5871, 6406)
+ * Batch read from OTC's queue and cleanup (CASSANDRA-1632)
+ * Secondary index support for collections (CASSANDRA-4511)
+ * SSTable metadata(Stats.db) format change (CASSANDRA-6356)
+ * Push composites support in the storage engine (CASSANDRA-5417)
+ * Add snapshot space used to cfstats (CASSANDRA-6231)
+ * Add cardinality estimator for key count estimation (CASSANDRA-5906)
+
+
2.0.5
+ * Delete unfinished compaction incrementally (CASSANDRA-6086)
Merged from 1.2:
* fsync compression metadata (CASSANDRA-6531)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/CassandraDaemon.java
index ffff006,d36b0db..02731d8
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@@ -21,7 -21,11 +21,9 @@@ import java.io.File
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.util.Arrays;
+ import java.util.Map;
+ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@@ -30,7 -34,7 +32,6 @@@ import javax.management.StandardMBean
import com.addthis.metrics.reporter.config.ReporterConfig;
import com.google.common.collect.Iterables;
- import com.google.common.collect.SetMultimap;
-import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -199,6 -237,22 +200,12 @@@ public class CassandraDaemo
// load keyspace descriptions.
DatabaseDescriptor.loadSchemas();
- try
- {
- LeveledManifest.maybeMigrateManifests();
- }
- catch(IOException e)
- {
- logger.error("Could not migrate old leveled manifest. Move away the .json file in the data directory", e);
- System.exit(100);
- }
-
+ // clean up compaction leftovers
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions();
+ for (Pair<String, String> kscf : unfinishedCompactions.keySet())
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(kscf.left, kscf.right, unfinishedCompactions.get(kscf));
+ SystemKeyspace.discardCompactionsInProgress();
+
// clean up debris in the rest of the keyspaces
for (String keyspaceName : Schema.instance.getKeyspaces())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 0a081c1,65b1708..0e50cb6
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@@ -53,7 -52,7 +53,8 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.*;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
import static org.junit.Assert.*;
@@@ -1613,6 -1636,63 +1620,63 @@@ public class ColumnFamilyStoreTest exte
// 2nd sstable should be removed (only 1st sstable exists in set of size 1)
sstables = dir.sstableLister().list();
+ assertEquals(1, sstables.size());
+ assertTrue(sstables.containsKey(sstable1.descriptor));
+
+ Map<Pair<String, String>, Map<Integer, UUID>> unfinished = SystemKeyspace.getUnfinishedCompactions();
+ assertTrue(unfinished.isEmpty());
+ }
+
+ /**
+ * @see <a href="https://issues.apache.org/jira/browse/CASSANDRA-6086">CASSANDRA-6086</a>
+ */
+ @Test
+ public void testFailedToRemoveUnfinishedCompactionLeftovers() throws Throwable
+ {
+ final String ks = "Keyspace1";
+ final String cf = "Standard4"; // should be empty
+
+ final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf);
+ Directories dir = Directories.create(ks, cf);
+ ByteBuffer key = bytes("key");
+
+ // Write SSTable generation 3 that has ancestors 1 and 2
+ final Set<Integer> ancestors = Sets.newHashSet(1, 2);
+ SSTableSimpleWriter writer = new SSTableSimpleWriter(dir.getDirectoryForNewSSTables(),
+ cfmeta, StorageService.getPartitioner())
+ {
+ protected SSTableWriter getWriter()
+ {
- SSTableMetadata.Collector collector = SSTableMetadata.createCollector(cfmeta.comparator);
++ MetadataCollector collector = new MetadataCollector(cfmeta.comparator);
+ for (int ancestor : ancestors)
+ collector.addAncestor(ancestor);
+ String file = new Descriptor(directory, ks, cf, 3, true).filenameFor(Component.DATA);
+ return new SSTableWriter(file,
+ 0,
+ metadata,
+ StorageService.getPartitioner(),
+ collector);
+ }
+ };
+ writer.newRow(key);
+ writer.addColumn(bytes("col"), bytes("val"), 1);
+ writer.close();
+
+ Map<Descriptor, Set<Component>> sstables = dir.sstableLister().list();
+ assert sstables.size() == 1;
+
+ Map.Entry<Descriptor, Set<Component>> sstableToOpen = sstables.entrySet().iterator().next();
+ final SSTableReader sstable1 = SSTableReader.open(sstableToOpen.getKey());
+
+ // simulate we don't have generation in compaction_history
+ Map<Integer, UUID> unfinishedCompactions = new HashMap<>();
+ UUID compactionTaskID = UUID.randomUUID();
+ for (Integer ancestor : ancestors)
+ unfinishedCompactions.put(ancestor, compactionTaskID);
+ ColumnFamilyStore.removeUnfinishedCompactionLeftovers(ks, cf, unfinishedCompactions);
+
+ // SSTable should not be deleted
+ sstables = dir.sstableLister().list();
assert sstables.size() == 1;
assert sstables.containsKey(sstable1.descriptor);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/49efc13c/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
----------------------------------------------------------------------