You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/04/09 14:47:53 UTC

[cassandra] branch trunk updated: Fail incremental repair if there is an old version sstable involved

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 579c9ab  Fail incremental repair if there is an old version sstable involved
579c9ab is described below

commit 579c9ab311e9efcbc3329599232a8f9992b2694e
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Mar 2 11:07:57 2020 +0100

    Fail incremental repair if there is an old version sstable involved
    
    Patch by marcuse; reviewed by Benjamin Lerer for CASSANDRA-15612
---
 CHANGES.txt                                        |  1 +
 .../db/compaction/CompactionStrategyManager.java   | 12 ++++
 .../cassandra/db/repair/PendingAntiCompaction.java | 18 ++++--
 .../cassandra/io/sstable/LegacySSTableTest.java    | 71 ++++++++++++++++++++++
 4 files changed, 98 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 46790c9..cda7585 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha4
+ * Fail incremental repair if an old version sstable is involved (CASSANDRA-15612)
  * Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773)
  * Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706)
  * Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index fd4dbeb..546f61b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1201,6 +1202,7 @@ public class CompactionStrategyManager implements INotificationConsumer
             {
                 sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
                 sstable.reloadSSTableMetadata();
+                verifyMetadata(sstable, repairedAt, pendingRepair, isTransient);
                 changed.add(sstable);
             }
         }
@@ -1218,4 +1220,14 @@ public class CompactionStrategyManager implements INotificationConsumer
             }
         }
     }
+
+    private static void verifyMetadata(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
+    {
+        if (!Objects.equals(pendingRepair, sstable.getPendingRepair()))
+            throw new IllegalStateException(String.format("Failed setting pending repair to %s on %s (pending repair is %s)", pendingRepair, sstable, sstable.getPendingRepair()));
+        if (repairedAt != sstable.getRepairedAt())
+            throw new IllegalStateException(String.format("Failed setting repairedAt to %d on %s (repairedAt is %d)", repairedAt, sstable, sstable.getRepairedAt()));
+        if (isTransient != sstable.isTransient())
+            throw new IllegalStateException(String.format("Failed setting isTransient to %b on %s (isTransient is %b)", isTransient, sstable, sstable.isTransient()));
+    }
 }
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index fac164d..e49e76e 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -69,7 +69,7 @@ public class PendingAntiCompaction
     private static final int ACQUIRE_SLEEP_MS = Integer.getInteger("cassandra.acquire_sleep_ms", 1000);
     private static final int ACQUIRE_RETRY_SECONDS = Integer.getInteger("cassandra.acquire_retry_seconds", 60);
 
-    static class AcquireResult
+    public static class AcquireResult
     {
         final ColumnFamilyStore cfs;
         final Refs<SSTableReader> refs;
@@ -82,7 +82,8 @@ public class PendingAntiCompaction
             this.txn = txn;
         }
 
-        void abort()
+        @VisibleForTesting
+        public void abort()
         {
             if (txn != null)
                 txn.abort();
@@ -122,6 +123,14 @@ public class PendingAntiCompaction
             if (metadata.repairedAt != UNREPAIRED_SSTABLE)
                 return false;
 
+            if (!sstable.descriptor.version.hasPendingRepair())
+            {
+                String message = String.format("Prepare phase failed because it encountered legacy sstables that don't " +
+                                               "support pending repair, run upgradesstables before starting incremental " +
+                                               "repairs, repair session (%s)", prsid);
+                throw new SSTableAcquisitionException(message);
+            }
+
             // exclude sstables pending repair, but record session ids for
             // non-finalized sessions for a later error message
             if (metadata.pendingRepair != NO_PENDING_REPAIR)
@@ -149,7 +158,7 @@ public class PendingAntiCompaction
         }
     }
 
-    static class AcquisitionCallable implements Callable<AcquireResult>
+    public static class AcquisitionCallable implements Callable<AcquireResult>
     {
         private final ColumnFamilyStore cfs;
         private final UUID sessionID;
@@ -157,7 +166,8 @@ public class PendingAntiCompaction
         private final int acquireRetrySeconds;
         private final int acquireSleepMillis;
 
-        AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis)
+        @VisibleForTesting
+        public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis)
         {
             this(cfs, sessionID, acquireRetrySeconds, acquireSleepMillis, new AntiCompactionPredicate(ranges, sessionID));
         }
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index de5ac52..7a18133 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -49,6 +50,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.AbstractCompactionTask;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.db.compaction.Verifier;
+import org.apache.cassandra.db.repair.PendingAntiCompaction;
 import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
 import org.apache.cassandra.db.ReadExecutionController;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
@@ -76,6 +78,7 @@ import org.apache.cassandra.streaming.StreamSession;
 import org.apache.cassandra.streaming.StreamOperation;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
 import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
@@ -214,6 +217,55 @@ public class LegacySSTableTest
     }
 
     @Test
+    public void testMutateMetadataCSM() throws Exception
+    {
+        // we need to make sure we write old version metadata in the format for that version
+        for (String legacyVersion : legacyVersions)
+        {
+            // Skip 2.0.1 sstables as it doesn't have repaired information
+            if (legacyVersion.equals("jb"))
+                continue;
+            truncateTables(legacyVersion);
+            loadLegacyTables(legacyVersion);
+
+            for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores())
+            {
+                // set pending
+                for (SSTableReader sstable : cfs.getLiveSSTables())
+                {
+                    UUID random = UUID.randomUUID();
+                    try
+                    {
+                        cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, random, false);
+                        if (!sstable.descriptor.version.hasPendingRepair())
+                            fail("We should fail setting pending repair on unsupported sstables "+sstable);
+                    }
+                    catch (IllegalStateException e)
+                    {
+                        if (sstable.descriptor.version.hasPendingRepair())
+                            fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable);
+                    }
+                }
+                // set transient
+                for (SSTableReader sstable : cfs.getLiveSSTables())
+                {
+                    try
+                    {
+                        cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, UUID.randomUUID(), true);
+                        if (!sstable.descriptor.version.hasIsTransient())
+                            fail("We should fail setting pending repair on unsupported sstables "+sstable);
+                    }
+                    catch (IllegalStateException e)
+                    {
+                        if (sstable.descriptor.version.hasIsTransient())
+                            fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable);
+                    }
+                }
+            }
+        }
+    }
+
+    @Test
     public void testMutateLevel() throws Exception
     {
         // we need to make sure we write old version metadata in the format for that version
@@ -317,6 +369,25 @@ public class LegacySSTableTest
     }
 
     @Test
+    public void testPendingAntiCompactionOldSSTables() throws Exception
+    {
+        for (String legacyVersion : legacyVersions)
+        {
+            ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion));
+            loadLegacyTable("legacy_%s_simple", legacyVersion);
+
+            boolean shouldFail = !cfs.getLiveSSTables().stream().allMatch(sstable -> sstable.descriptor.version.hasPendingRepair());
+            IPartitioner p = Iterables.getFirst(cfs.getLiveSSTables(), null).getPartitioner();
+            Range<Token> r = new Range<>(p.getMinimumToken(), p.getMinimumToken());
+            PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, Collections.singleton(r), UUIDGen.getTimeUUID(), 0, 0);
+            PendingAntiCompaction.AcquireResult res = acquisitionCallable.call();
+            assertEquals(shouldFail, res == null);
+            if (res != null)
+                res.abort();
+        }
+    }
+
+    @Test
     public void testAutomaticUpgrade() throws Exception
     {
         for (String legacyVersion : legacyVersions)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org