You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/04/30 05:35:34 UTC

[cassandra] branch cassandra-3.11 updated: Make sure sstables with moved starts are removed correctly in LeveledGenerations (backported from 4.0)

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.11 by this push:
     new 4b018b5  Make sure sstables with moved starts are removed correctly in LeveledGenerations (backported from 4.0)
4b018b5 is described below

commit 4b018b5bf4117d2377a67a495a40a7d7e49b30f5
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Tue Apr 6 08:44:35 2021 +0200

    Make sure sstables with moved starts are removed correctly in LeveledGenerations (backported from 4.0)
    
    Patch by marcuse; reviewed by Benedict Elliott Smith for CASSANDRA-16552
---
 CHANGES.txt                                        |   1 +
 .../db/compaction/LeveledGenerations.java          |  31 +++++-
 .../db/compaction/CompactionsCQLTest.java          | 120 +++++++++++++++++++++
 3 files changed, 147 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index bb0309f..972cf48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -3,6 +3,7 @@
  * Ignore stale acks received in the shadow round (CASSANDRA-16588)
  * Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350)
  * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447)
+ * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552)
  * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
 Merged from 3.0:
  * Fix materialized view builders inserting truncated data (CASSANDRA-16567)
diff --git a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
index f7087f0..21fce80 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledGenerations.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
@@ -54,6 +56,19 @@ class LeveledGenerations
     // dependent on maxSSTableSize.)
     static final int MAX_LEVEL_COUNT = (int) Math.log10(1000 * 1000 * 1000);
 
+    /**
+     * This map is used to track the original NORMAL instances of sstables
+     *
+     * When aborting a compaction we can get notified that a MOVED_STARTS sstable is replaced with a NORMAL instance
+     * of the same sstable but since we use sorted sets (on the first token) in L1+ we won't find it and won't remove it.
+     * Then, when we add the NORMAL instance we have to replace the *instance* of the sstable to be able to later mark
+     * it compacting again.
+     *
+     * In this map we rely on the fact that hashCode and equals do not care about first token, so when we
+     * do allSSTables.get(instance_with_moved_starts) we will get the NORMAL sstable back, which we can then remove
+     * from the TreeSet.
+     */
+    private final Map<SSTableReader, SSTableReader> allSSTables = new HashMap<>();
     private final Set<SSTableReader> l0 = new HashSet<>();
     private static long lastOverlapCheck = System.nanoTime();
     // note that since l0 is broken out, levels[0] represents L1:
@@ -92,7 +107,7 @@ class LeveledGenerations
      * If adding an sstable would cause an overlap in the level (if level > 1) we send it to L0. This can happen
      * for example when moving sstables from unrepaired to repaired.
      *
-     * If the sstable is already in the manifest we skip it.
+     * If the sstable is already in the manifest we replace the instance.
      *
      * If the sstable exists in the manifest but has the wrong level, it is removed from the wrong level and added to the correct one
      *
@@ -112,15 +127,16 @@ class LeveledGenerations
                     logger.error("SSTable {} on the wrong level in the manifest - {} instead of {} as recorded in the sstable metadata, removing from level {}", sstable, existingLevel, sstable.getSSTableLevel(), existingLevel);
                     if (strictLCSChecksTest)
                         throw new AssertionError("SSTable not in matching level in manifest: "+sstable + ": "+existingLevel+" != " + sstable.getSSTableLevel());
-                    get(existingLevel).remove(sstable);
                 }
                 else
                 {
-                    logger.info("Manifest already contains {} in level {} - skipping", sstable, existingLevel);
-                    continue;
+                    logger.info("Manifest already contains {} in level {} - replacing instance", sstable, existingLevel);
                 }
+                get(existingLevel).remove(sstable);
+                allSSTables.remove(sstable);
             }
 
+            allSSTables.put(sstable, sstable);
             if (sstable.getSSTableLevel() == 0)
             {
                 l0.add(sstable);
@@ -197,7 +213,12 @@ class LeveledGenerations
         {
             int level = sstable.getSSTableLevel();
             minLevel = Math.min(minLevel, level);
-            get(level).remove(sstable);
+            SSTableReader versionInManifest = allSSTables.get(sstable);
+            if (versionInManifest != null)
+            {
+                get(level).remove(versionInManifest);
+                allSSTables.remove(versionInManifest);
+            }
         }
         return minLevel;
     }
diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
index fe15085..aa29343 100644
--- a/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsCQLTest.java
@@ -17,18 +17,28 @@
  */
 package org.apache.cassandra.db.compaction;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 
 import org.junit.Test;
 
 import org.apache.cassandra.cql3.CQLTester;
 import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
+import org.apache.cassandra.db.compaction.writers.MaxSSTableSizeWriter;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.schema.CompactionParams;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -251,6 +261,116 @@ public class CompactionsCQLTest extends CQLTester
          assertEquals(CompactionParams.TombstoneOption.ROW, getCurrentColumnFamilyStore().getCompactionStrategyManager().getCompactionParams().tombstoneOption());
      }
 
+    @Test
+    public void testAbortNotifications() throws Throwable
+    {
+        createTable("create table %s (id int primary key, x blob) with compaction = {'class':'LeveledCompactionStrategy', 'sstable_size_in_mb':1}");
+        Random r = new Random();
+        byte [] b = new byte[100 * 1024];
+        for (int i = 0; i < 1000; i++)
+        {
+            r.nextBytes(b);
+            execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+        }
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        getCurrentColumnFamilyStore().disableAutoCompaction();
+        for (int i = 0; i < 1000; i++)
+        {
+            r.nextBytes(b);
+            execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+        }
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+
+        LeveledCompactionStrategy lcs = (LeveledCompactionStrategy) getCurrentColumnFamilyStore().getCompactionStrategyManager().getStrategies().get(1).get(0);
+        LeveledCompactionTask lcsTask;
+        while (true)
+        {
+            lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+            if (lcsTask != null)
+            {
+                lcsTask.execute(null);
+                break;
+            }
+            Thread.sleep(1000);
+        }
+        // now all sstables are non-overlapping in L1 - we need them to be in L2:
+        for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+        {
+            lcs.removeSSTable(sstable);
+            sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 2);
+            sstable.reloadSSTableMetadata();
+            lcs.addSSTable(sstable);
+        }
+
+        for (int i = 0; i < 1000; i++)
+        {
+            r.nextBytes(b);
+            execute("insert into %s (id, x) values (?, ?)", i, ByteBuffer.wrap(b));
+        }
+        getCurrentColumnFamilyStore().forceBlockingFlush();
+        // now we have a bunch of sstables in L2 and one in L0 - bump the L0 one to L1:
+        for (SSTableReader sstable : getCurrentColumnFamilyStore().getLiveSSTables())
+        {
+            if (sstable.getSSTableLevel() == 0)
+            {
+                lcs.removeSSTable(sstable);
+                sstable.descriptor.getMetadataSerializer().mutateLevel(sstable.descriptor, 1);
+                sstable.reloadSSTableMetadata();
+                lcs.addSSTable(sstable);
+            }
+        }
+        // at this point we have a single sstable in L1, and a bunch of sstables in L2 - a background compaction should
+        // trigger an L1 -> L2 compaction which we abort after creating 5 sstables - this notifies LCS that MOVED_START
+        // sstables have been removed.
+        try
+        {
+            AbstractCompactionTask task = new NotifyingCompactionTask((LeveledCompactionTask) lcs.getNextBackgroundTask(0));
+            task.execute(null);
+            fail("task should throw exception");
+        }
+        catch (Exception ignored)
+        {
+            // ignored
+        }
+
+        lcsTask = (LeveledCompactionTask) lcs.getNextBackgroundTask(0);
+        try
+        {
+            assertNotNull(lcsTask);
+        }
+        finally
+        {
+            if (lcsTask != null)
+                lcsTask.transaction.abort();
+        }
+    }
+
+    private static class NotifyingCompactionTask extends LeveledCompactionTask
+    {
+        public NotifyingCompactionTask(LeveledCompactionTask task)
+        {
+            super(task.cfs, task.transaction, task.getLevel(), task.gcBefore, task.getLevel(), false);
+        }
+
+        @Override
+        public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs,
+                                                              Directories directories,
+                                                              LifecycleTransaction txn,
+                                                              Set<SSTableReader> nonExpiredSSTables)
+        {
+            return new MaxSSTableSizeWriter(cfs, directories, txn, nonExpiredSSTables, 1 << 20, 1)
+            {
+                int switchCount = 0;
+                public void switchCompactionLocation(Directories.DataDirectory directory)
+                {
+                    switchCount++;
+                    if (switchCount > 5)
+                        throw new RuntimeException("Throw after a few sstables have had their starts moved");
+                    super.switchCompactionLocation(directory);
+                }
+            };
+        }
+    }
 
     public boolean verifyStrategies(CompactionStrategyManager manager, Class<? extends AbstractCompactionStrategy> expected)
     {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org