You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/04/30 05:35:34 UTC
[cassandra] branch cassandra-3.11 updated: Make sure sstables with
moved starts are removed correctly in LeveledGenerations (backported from
4.0)
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
new 4b018b5 Make sure sstables with moved starts are removed correctly in LeveledGenerations (backported from 4.0)
4b018b5 is described below
commit 4b018b5bf4117d2377a67a495a40a7d7e49b30f5
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Apr 6 08:44:35 2021 +0200
Make sure sstables with moved starts are removed correctly in LeveledGenerations (backported from 4.0)
Patch by marcuse; reviewed by Benedict Elliott Smith for CASSANDRA-16552
---
CHANGES.txt | 1 +
.../db/compaction/LeveledGenerations.java | 31 +++++-
.../db/compaction/CompactionsCQLTest.java | 120 +++++++++++++++++++++
3 files changed, 147 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index bb0309f..972cf48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
* Ignore stale acks received in the shadow round (CASSANDRA-16588)
* Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350)
* Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447)
+ * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
* Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
Merged from 3.0:
* Fix materialized view builders inserting truncated data (CASSANDRA-16567)
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
index f7087f0..21fce80 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -54,6 +56,19 @@ class LeveledGenerations
// dependent on maxSSTableSize.)
static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
+ /**
+ * This map is used to track the original NORMAL instances of sstables
+ *
+ * When aborting a compaction we can get notified that a MOVED_STARTS sstable is replaced with a NORMAL instance
+ * of the same sstable but since we use sorted sets (on the first token) in L1+ we won't find it and won't remove it.
+ * Then, when we add the NORMAL instance we have to replace the *instance* of the sstable to be able to later mark
+ * it compacting again.
+ *
+ * In this map we rely on the fact that hashCode and equals do not care about first token, so when we
+ * do allSSTables.get(instance_with_moved_starts) we will get the NORMAL sstable back, which we can then remove
+ * from the TreeSet.
+ */
+ private final Map<SSTableReader, SSTableReader> allSSTables = new HashMap<>();
private final Set<SSTableReader> l0 = new HashSet<>();
private static long lastOverlapCheck = System.nanoTime();
// note that since l0 is broken out, levels[0] represents L1:
@@ -92,7 +107,7 @@ class LeveledGenerations
* If adding an sstable would cause an overlap in the level (if level > 1) we send it to L0. This can happen
* for example when moving sstables from unrepaired to repaired.
*
- * If the sstable is already in the manifest we skip it.
+ * If the sstable is already in the manifest we replace the instance.
*
* If the sstable exists in the manifest but has the wrong level, it is removed from the wrong level and added to the correct one
*
@@ -112,15 +127,16 @@ class LeveledGenerations
logger.error("SSTable {} on the wrong level in the manifest - {} instead of {} as recorded in the sstable metadata, removing from level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel);
if (strictLCSChecksTest)
throw new AssertionError("SSTable not in matching level in manifest: "+sstable + ": "+existingLevel+" != " + sstable.getSSTableLevel());
- get(existingLevel).remove(sstable);
}
else
{
- logger.info("Manifest already contains {} in level {} - skipping", sstable, existingLevel);
- continue;
+ logger.info("Manifest already contains {} in level {} - replacing instance", sstable, existingLevel);
}
+ get(existingLevel).remove(sstable);
+ allSSTables.remove(sstable);
}
+ allSSTables.put(sstable, sstable);
if (sstable.getSSTableLevel() == 0)
{
l0.add(sstable);
@@ -197,7 +213,12 @@ class LeveledGenerations
{
int level = sstable.getSSTableLevel();
minLevel = Math.min(minLevel, level);
- get(level).remove(sstable);
+ SSTableReader versionInManifest = allSSTables.get(sstable);
+ if (versionInManifest != null)
+ {
+ get(level).remove(versionInManifest);
+ allSSTables.remove(versionInManifest);
+ }
}
return minLevel;
}
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index fe15085..aa29343 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,18 +17,28 @@
*/
package org.apache.cassandra.db.compaction;
+import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import org.junit.Test;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.CompactionParams;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -251,6 +261,116 @@ public class CompactionsCQLTest extends CQLTester
assertEquals(CompactionParams.TombstoneOption.ROW, getCurrentColumnFamilyStore().getCompactionStrategyManager().getCompactionParams().tombstoneOption());
}
+ @Test
+ public void testAbortNotifications() throws Throwable
+ {
+ createTable("create table %s (id int primary key, x blob) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1}");
+ Random r = new Random();
+ byte [] b = new byte[100 * 1024];
+ for (int i = 0; i < 1000; i++)
+ {
+ r.nextBytes(b);
+ execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+ }
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ getCurrentColumnFamilyStore().disableAutoCompaction();
+ for (int i = 0; i < 1000; i++)
+ {
+ r.nextBytes(b);
+ execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+ }
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+
+ LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies().get(1).get(0);
+ LeveledCompactionTask lcsTask;
+ while (true)
+ {
+ lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+ if (lcsTask != null)
+ {
+ lcsTask.execute(null);
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ // now all sstables are non-overlapping in L1 - we need them to be in L2:
+ for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+ {
+ lcs.removeSSTable(sstable);
+ sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2);
+ sstable.reloadSSTableMetadata();
+ lcs.addSSTable(sstable);
+ }
+
+ for (int i = 0; i < 1000; i++)
+ {
+ r.nextBytes(b);
+ execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+ }
+ getCurrentColumnFamilyStore().forceBlockingFlush();
+ // now we have a bunch of sstables in L2 and one in L0 - bump the L0 one to L1:
+ for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+ {
+ if (sstable.getSSTableLevel() == 0)
+ {
+ lcs.removeSSTable(sstable);
+ sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1);
+ sstable.reloadSSTableMetadata();
+ lcs.addSSTable(sstable);
+ }
+ }
+ // at this point we have a single sstable in L1, and a bunch of sstables in L2 - a background compaction should
+ // trigger an L1 -> L2 compaction which we abort after creating 5 sstables - this notifies LCS that MOVED_START
+ // sstables have been removed.
+ try
+ {
+ AbstractCompactionTask task = new NotifyingCompactionTask((LeveledCompactionTask) lcs.getNextBackgroundTask(0));
+ task.execute(null);
+ fail("task should throw exception");
+ }
+ catch (Exception ignored)
+ {
+ // ignored
+ }
+
+ lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+ try
+ {
+ assertNotNull(lcsTask);
+ }
+ finally
+ {
+ if (lcsTask != null)
+ lcsTask.transaction.abort();
+ }
+ }
+
+ private static class NotifyingCompactionTask extends LeveledCompactionTask
+ {
+ public NotifyingCompactionTask(LeveledCompactionTask task)
+ {
+ super(task.cfs, task.transaction, task.getLevel(), task.gcBefore, task.getLevel(), false);
+ }
+
+ @Override
+ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+ Directories directories,
+ LifecycleTransaction txn,
+ Set<SSTableReader> nonExpiredSSTables)
+ {
+ return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, 1 << 20, 1)
+ {
+ int switchCount = 0;
+ public void switchCompactionLocation(Directories.DataDirectory directory)
+ {
+ switchCount++;
+ if (switchCount > 5)
+ throw new RuntimeException("Throw after a few sstables have had their starts moved");
+ super.switchCompactionLocation(directory);
+ }
+ };
+ }
+ }
public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected)
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org