You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/04/07 12:48:33 UTC

[cassandra] branch trunk updated: Make sure sstables with moved starts are removed correctly in LeveledGenerations

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a1210aa  Make sure sstables with moved starts are removed correctly in LeveledGenerations
a1210aa is described below

commit a1210aa28d111fcd614691ddf83d3af340262203
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Apr 6 08:44:35 2021 +0200

    Make sure sstables with moved starts are removed correctly in LeveledGenerations
    
    Patch by marcuse; reviewed by Benedict Elliott Smith for CASSANDRA-16552
---
 CHANGES.txt                                        |   1 +
 .../db/compaction/LeveledGenerations.java          |  42 +++++++-
 .../cassandra/db/compaction/LeveledManifest.java   |   6 +-
 .../db/compaction/CompactionsCQLTest.java          | 115 +++++++++++++++++++++
 4 files changed, 154 insertions(+), 10 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index d5b7134..9de4320 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-rc1
+ * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
  * Fix race between secondary index building and active compactions tracking (CASSANDRA-16554)
  * Migrate dependency handling from maven-ant-tasks to resolver-ant-tasks, removing lib/ directory from version control (CASSANDRA-16391)
  * Fix 4.0 node sending a repair prepare message to a 3.x node breaking the connection (CASSANDRA-16542)
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
index bc83275..e4481a2 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +56,19 @@ class LeveledGenerations
     // dependent on maxSSTableSize.)
     static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
 
+    /**
+     * This map is used to track the original NORMAL instances of sstables
+     *
+     * When aborting a compaction we can get notified that a MOVED_STARTS sstable is replaced with a NORMAL instance
+     * of the same sstable but since we use sorted sets (on the first token) in L1+ we won't find it and won't remove it.
+     * Then, when we add the NORMAL instance we have to replace the *instance* of the sstable to be able to later mark
+     * it compacting again.
+     *
+     * In this map we rely on the fact that hashCode and equals do not care about first token, so when we
+     * do allSSTables.get(instance_with_moved_starts) we will get the NORMAL sstable back, which we can then remove
+     * from the TreeSet.
+     */
+    private final Map<SSTableReader, SSTableReader> allSSTables = new HashMap<>();
     private final Set<SSTableReader> l0 = new HashSet<>();
     private static long lastOverlapCheck = System.nanoTime();
     // note that since l0 is broken out, levels[0] represents L1:
@@ -92,7 +107,7 @@ class LeveledGenerations
      * If adding an sstable would cause an overlap in the level (if level > 1) we send it to L0. This can happen
      * for example when moving sstables from unrepaired to repaired.
      *
-     * If the sstable is already in the manifest we skip it.
+     * If the sstable is already in the manifest we replace the instance.
      *
      * If the sstable exists in the manifest but has the wrong level, it is removed from the wrong level and added to the correct one
      *
@@ -112,15 +127,16 @@ class LeveledGenerations
                     logger.error("SSTable {} on the wrong level in the manifest - {} instead of {} as recorded in the sstable metadata, removing from level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel);
                     if (strictLCSChecksTest)
                         throw new AssertionError("SSTable not in matching level in manifest: "+sstable + ": "+existingLevel+" != " + sstable.getSSTableLevel());
-                    get(existingLevel).remove(sstable);
                 }
                 else
                 {
-                    logger.info("Manifest already contains {} in level {} - skipping", sstable, existingLevel);
-                    continue;
+                    logger.info("Manifest already contains {} in level {} - replacing instance", sstable, existingLevel);
                 }
+                get(existingLevel).remove(sstable);
+                allSSTables.remove(sstable);
             }
 
+            allSSTables.put(sstable, sstable);
             if (sstable.getSSTableLevel() == 0)
             {
                 l0.add(sstable);
@@ -196,7 +212,12 @@ class LeveledGenerations
         {
             int level = sstable.getSSTableLevel();
             minLevel = Math.min(minLevel, level);
-            get(level).remove(sstable);
+            SSTableReader versionInManifest = allSSTables.get(sstable);
+            if (versionInManifest != null)
+            {
+                get(level).remove(versionInManifest);
+                allSSTables.remove(versionInManifest);
+            }
         }
         return minLevel;
     }
@@ -307,4 +328,15 @@ class LeveledGenerations
             }
         }
     }
+
+    void newLevel(SSTableReader sstable, int oldLevel)
+    {
+        SSTableReader versionInManifest = allSSTables.remove(sstable);
+        boolean removed = false;
+        if (versionInManifest != null)
+            removed = get(oldLevel).remove(versionInManifest);
+        if (!removed)
+            logger.warn("Could not remove "+sstable+" from "+oldLevel);
+        addAll(Collections.singleton(sstable));
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
index 4f70f64..5d2bb58 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
@@ -666,11 +666,7 @@ public class LeveledManifest
 
     synchronized void newLevel(SSTableReader sstable, int oldLevel)
     {
-        boolean removed = generations.get(oldLevel).remove(sstable);
-        // if reload races with the metadataChanged notification the sstable might already be removed
-        if (!removed)
-            logger.warn("Could not remove "+sstable+" from "+oldLevel);
-        generations.addAll(Collections.singleton(sstable));
+        generations.newLevel(sstable, oldLevel);
         lastCompactedSSTables[oldLevel] = sstable;
     }
 
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index fa42ab0..a37437f 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -23,6 +23,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.junit.After;
@@ -35,10 +36,15 @@ import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.Directories;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.RowIndexEntry;
 import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.compaction.writers.MajorLeveledCompactionWriter;
+import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.rows.Cell;
@@ -455,6 +461,115 @@ public class CompactionsCQLTest extends CQLTester
         act.execute(ActiveCompactionsTracker.NOOP);
     }
 
+    @Test
+    public void testAbortNotifications() throws Throwable
+    {
+        createTable("create table %s (id int primary key, x blob) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1}");
+        Random r = new Random();
+        byte [] b = new byte[100 * 1024];
+        for (int i = 0; i < 1000; i++)
+        {
+            r.nextBytes(b);
+            execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+        }
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 1000; i++)
+        {
+            r.nextBytes(b);
+            execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+        }
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategyManager().getUnrepairedUnsafe().first();
+        LeveledCompactionTask lcsTask;
+        while (true)
+        {
+            lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+            if (lcsTask != null)
+            {
+                lcsTask.execute(CompactionManager.instance.active);
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        // now all sstables are non-overlapping in L1 - we need them to be in L2:
+        for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+        {
+            lcs.removeSSTable(sstable);
+            sstable.mutateLevelAndReload(2);
+            lcs.addSSTable(sstable);
+        }
+
+        for (int i = 0; i < 1000; i++)
+        {
+            r.nextBytes(b);
+            execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+        }
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        // now we have a bunch of sstables in L2 and one in L0 - bump the L0 one to L1:
+        for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+        {
+            if (sstable.getSSTableLevel() == 0)
+            {
+                lcs.removeSSTable(sstable);
+                sstable.mutateLevelAndReload(1);
+                lcs.addSSTable(sstable);
+            }
+        }
+        // at this point we have a single sstable in L1, and a bunch of sstables in L2 - a background compaction should
+        // trigger an L1 -> L2 compaction which we abort after creating 5 sstables - this notifies LCS that MOVED_START
+        // sstables have been removed.
+        try
+        {
+            AbstractCompactionTask task = new NotifyingCompactionTask((LeveledCompactionTask) lcs.getNextBackgroundTask(0));
+            task.execute(CompactionManager.instance.active);
+            fail("task should throw exception");
+        }
+        catch (Exception ignored)
+        {
+            // ignored
+        }
+
+        lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+        try
+        {
+            assertNotNull(lcsTask);
+        }
+        finally
+        {
+            if (lcsTask != null)
+                lcsTask.transaction.abort();
+        }
+    }
+
+    private static class NotifyingCompactionTask extends LeveledCompactionTask
+    {
+        public NotifyingCompactionTask(LeveledCompactionTask task)
+        {
+            super(task.cfs, task.transaction, task.getLevel(), task.gcBefore, task.getLevel(), false);
+        }
+
+        @Override
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              Directories directories,
+                                                              LifecycleTransaction txn,
+                                                              Set<SSTableReader> nonExpiredSSTables)
+        {
+            return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, 1 << 20, 1)
+            {
+                int switchCount = 0;
+                public void switchCompactionLocation(Directories.DataDirectory directory)
+                {
+                    switchCount++;
+                    if (switchCount > 5)
+                        throw new RuntimeException("Throw after a few sstables have had their starts moved");
+                    super.switchCompactionLocation(directory);
+                }
+            };
+        }
+    }
+
     private void prepareWide() throws Throwable
     {
         createTable("CREATE TABLE %s (id int, id2 int, b text, primary key (id, id2))");

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org