You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/04/07 12:48:33 UTC
[cassandra] branch trunk updated: Make sure sstables with moved
starts are removed correctly in LeveledGenerations
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new a1210aa Make sure sstables with moved starts are removed correctly in LeveledGenerations
a1210aa is described below
commit a1210aa28d111fcd614691ddf83d3af340262203
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Apr 6 08:44:35 2021 +0200
Make sure sstables with moved starts are removed correctly in LeveledGenerations
Patch by marcuse; reviewed by Benedict Elliott Smith for CASSANDRA-16552
---
CHANGES.txt | 1 +
.../db/compaction/LeveledGenerations.java | 42 +++++++-
.../cassandra/db/compaction/LeveledManifest.java | 6 +-
.../db/compaction/CompactionsCQLTest.java | 115 +++++++++++++++++++++
4 files changed, 154 insertions(+), 10 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index d5b7134..9de4320 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-rc1
+ * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
* Fix race between secondary index building and active compactions tracking (CASSANDRA-16554)
* Migrate dependency handling from maven-ant-tasks to resolver-ant-tasks, removing lib/ directory from version control (CASSANDRA-16391)
* Fix 4.0 node sending a repair prepare message to a 3.x node breaking the connection (CASSANDRA-16542)
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
index bc83275..e4481a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -54,6 +56,19 @@ class LeveledGenerations
// dependent on maxSSTableSize.)
static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
+ /**
+ * This map is used to track the original NORMAL instances of sstables
+ *
+ * When aborting a compaction we can get notified that a MOVED_STARTS sstable is replaced with a NORMAL instance
+ * of the same sstable but since we use sorted sets (on the first token) in L1+ we won't find it and won't remove it.
+ * Then, when we add the NORMAL instance we have to replace the *instance* of the sstable to be able to later mark
+ * it compacting again.
+ *
+ * In this map we rely on the fact that hashCode and equals do not care about first token, so when we
+ * do allSSTables.get(instance_with_moved_starts) we will get the NORMAL sstable back, which we can then remove
+ * from the TreeSet.
+ */
+ private final Map<SSTableReader, SSTableReader> allSSTables = new HashMap<>();
private final Set<SSTableReader> l0 = new HashSet<>();
private static long lastOverlapCheck = System.nanoTime();
// note that since l0 is broken out, levels[0] represents L1:
@@ -92,7 +107,7 @@ class LeveledGenerations
* If adding an sstable would cause an overlap in the level (if level > 1) we send it to L0. This can happen
* for example when moving sstables from unrepaired to repaired.
*
- * If the sstable is already in the manifest we skip it.
+ * If the sstable is already in the manifest we replace the instance.
*
* If the sstable exists in the manifest but has the wrong level, it is removed from the wrong level and added to the correct one
*
@@ -112,15 +127,16 @@ class LeveledGenerations
logger.error("SSTable {} on the wrong level in the manifest - {} instead of {} as recorded in the sstable metadata, removing from level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel);
if (strictLCSChecksTest)
throw new AssertionError("SSTable not in matching level in manifest: "+sstable + ": "+existingLevel+" != " + sstable.getSSTableLevel());
- get(existingLevel).remove(sstable);
}
else
{
- logger.info("Manifest already contains {} in level {} - skipping", sstable, existingLevel);
- continue;
+ logger.info("Manifest already contains {} in level {} - replacing instance", sstable, existingLevel);
}
+ get(existingLevel).remove(sstable);
+ allSSTables.remove(sstable);
}
+ allSSTables.put(sstable, sstable);
if (sstable.getSSTableLevel() == 0)
{
l0.add(sstable);
@@ -196,7 +212,12 @@ class LeveledGenerations
{
int level = sstable.getSSTableLevel();
minLevel = Math.min(minLevel, level);
- get(level).remove(sstable);
+ SSTableReader versionInManifest = allSSTables.get(sstable);
+ if (versionInManifest != null)
+ {
+ get(level).remove(versionInManifest);
+ allSSTables.remove(versionInManifest);
+ }
}
return minLevel;
}
@@ -307,4 +328,15 @@ class LeveledGenerations
}
}
}
+
+ void newLevel(SSTableReader sstable, int oldLevel)
+ {
+ SSTableReader versionInManifest = allSSTables.remove(sstable);
+ boolean removed = false;
+ if (versionInManifest != null)
+ removed = get(oldLevel).remove(versionInManifest);
+ if (!removed)
+ logger.warn("Could not remove "+sstable+" from "+oldLevel);
+ addAll(Collections.singleton(sstable));
+ }
}
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 4f70f64..5d2bb58 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -666,11 +666,7 @@ public class LeveledManifest
synchronized void newLevel(SSTableReader sstable, int oldLevel)
{
- boolean removed = generations.get(oldLevel).remove(sstable);
- // if reload races with the metadataChanged notification the sstable might already be removed
- if (!removed)
- logger.warn("Could not remove "+sstable+" from "+oldLevel);
- generations.addAll(Collections.singleton(sstable));
+ generations.newLevel(sstable, oldLevel);
lastCompactedSSTables[oldLevel] = sstable;
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index fa42ab0..a37437f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.junit.After;
@@ -35,10 +36,15 @@ import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowIndexEntry;
import org.apache.cassandra.db.RowUpdateBuilder;
import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
+import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.rows.Cell;
@@ -455,6 +461,115 @@ public class CompactionsCQLTest extends CQLTester
act.execute(ActiveCompactionsTracker.NOOP);
}
+ @Test
+ public void testAbortNotifications() throws Throwable
+ {
+ createTable("create table %s (id int primary key, x blob) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1}");
+ Random r = new Random();
+ byte [] b = new byte[100 * 1024];
+ for (int i = 0; i < 1000; i++)
+ {
+ r.nextBytes(b);
+ execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+ }
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 1000; i++)
+ {
+ r.nextBytes(b);
+ execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+ }
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategyManager().getUnrepairedUnsafe().first();
+ LeveledCompactionTask lcsTask;
+ while (true)
+ {
+ lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+ if (lcsTask != null)
+ {
+ lcsTask.execute(CompactionManager.instance.active);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ // now all sstables are non-overlapping in L1 - we need them to be in L2:
+ for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+ {
+ lcs.removeSSTable(sstable);
+ sstable.mutateLevelAndReload(2);
+ lcs.addSSTable(sstable);
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ r.nextBytes(b);
+ execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+ }
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ // now we have a bunch of sstables in L2 and one in L0 - bump the L0 one to L1:
+ for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+ {
+ if (sstable.getSSTableLevel() == 0)
+ {
+ lcs.removeSSTable(sstable);
+ sstable.mutateLevelAndReload(1);
+ lcs.addSSTable(sstable);
+ }
+ }
+ // at this point we have a single sstable in L1, and a bunch of sstables in L2 - a background compaction should
+ // trigger an L1 -> L2 compaction which we abort after creating 5 sstables - this notifies LCS that MOVED_START
+ // sstables have been removed.
+ try
+ {
+ AbstractCompactionTask task = new NotifyingCompactionTask((LeveledCompactionTask) lcs.getNextBackgroundTask(0));
+ task.execute(CompactionManager.instance.active);
+ fail("task should throw exception");
+ }
+ catch (Exception ignored)
+ {
+ // ignored
+ }
+
+ lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+ try
+ {
+ assertNotNull(lcsTask);
+ }
+ finally
+ {
+ if (lcsTask != null)
+ lcsTask.transaction.abort();
+ }
+ }
+
+ private static class NotifyingCompactionTask extends LeveledCompactionTask
+ {
+ public NotifyingCompactionTask(LeveledCompactionTask task)
+ {
+ super(task.cfs, task.transaction, task.getLevel(), task.gcBefore, task.getLevel(), false);
+ }
+
+ @Override
+ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
+ LifecycleTransaction txn,
+ Set<SSTableReader> nonExpiredSSTables)
+ {
+ return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, 1 << 20, 1)
+ {
+ int switchCount = 0;
+ public void switchCompactionLocation(Directories.DataDirectory directory)
+ {
+ switchCount++;
+ if (switchCount > 5)
+ throw new RuntimeException("Throw after a few sstables have had their starts moved");
+ super.switchCompactionLocation(directory);
+ }
+ };
+ }
+ }
+
private void prepareWide() throws Throwable
{
createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org