You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/04/09 14:47:53 UTC
[cassandra] branch trunk updated: Fail incremental repair if there
is an old version sstable involved
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 579c9ab Fail incremental repair if there is an old version sstable involved
579c9ab is described below
commit 579c9ab311e9efcbc3329599232a8f9992b2694e
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Mon Mar 2 11:07:57 2020 +0100
Fail incremental repair if there is an old version sstable involved
Patch by marcuse; reviewed by Benjamin Lerer for CASSANDRA-15612
---
CHANGES.txt | 1 +
.../db/compaction/CompactionStrategyManager.java | 12 ++++
.../cassandra/db/repair/PendingAntiCompaction.java | 18 ++++--
.../cassandra/io/sstable/LegacySSTableTest.java | 71 ++++++++++++++++++++++
4 files changed, 98 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 46790c9..cda7585 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Fail incremental repair if an old version sstable is involved (CASSANDRA-15612)
* Fix overflows on StreamingTombstoneHistogramBuilder produced by large deletion times (CASSANDRA-14773)
* Mark system_views/system_virtual_schema as system keyspaces in cqlsh (CASSANDRA-15706)
* Avoid unnecessary collection/iterator allocations during btree construction (CASSANDRA-15390)
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
index fd4dbeb..546f61b 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -1201,6 +1202,7 @@ public class CompactionStrategyManager implements INotificationConsumer
{
sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, repairedAt, pendingRepair, isTransient);
sstable.reloadSSTableMetadata();
+ verifyMetadata(sstable, repairedAt, pendingRepair, isTransient);
changed.add(sstable);
}
}
@@ -1218,4 +1220,14 @@ public class CompactionStrategyManager implements INotificationConsumer
}
}
}
+
+ private static void verifyMetadata(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient)
+ {
+ if (!Objects.equals(pendingRepair, sstable.getPendingRepair()))
+ throw new IllegalStateException(String.format("Failed setting pending repair to %s on %s (pending repair is %s)", pendingRepair, sstable, sstable.getPendingRepair()));
+ if (repairedAt != sstable.getRepairedAt())
+ throw new IllegalStateException(String.format("Failed setting repairedAt to %d on %s (repairedAt is %d)", repairedAt, sstable, sstable.getRepairedAt()));
+ if (isTransient != sstable.isTransient())
+ throw new IllegalStateException(String.format("Failed setting isTransient to %b on %s (isTransient is %b)", isTransient, sstable, sstable.isTransient()));
+ }
}
diff --git a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
index fac164d..e49e76e 100644
--- a/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
+++ b/src/java/org/apache/cassandra/db/repair/PendingAntiCompaction.java
@@ -69,7 +69,7 @@ public class PendingAntiCompaction
private static final int ACQUIRE_SLEEP_MS = Integer.getInteger("cassandra.acquire_sleep_ms", 1000);
private static final int ACQUIRE_RETRY_SECONDS = Integer.getInteger("cassandra.acquire_retry_seconds", 60);
- static class AcquireResult
+ public static class AcquireResult
{
final ColumnFamilyStore cfs;
final Refs<SSTableReader> refs;
@@ -82,7 +82,8 @@ public class PendingAntiCompaction
this.txn = txn;
}
- void abort()
+ @VisibleForTesting
+ public void abort()
{
if (txn != null)
txn.abort();
@@ -122,6 +123,14 @@ public class PendingAntiCompaction
if (metadata.repairedAt != UNREPAIRED_SSTABLE)
return false;
+ if (!sstable.descriptor.version.hasPendingRepair())
+ {
+ String message = String.format("Prepare phase failed because it encountered legacy sstables that don't " +
+ "support pending repair, run upgradesstables before starting incremental " +
+ "repairs, repair session (%s)", prsid);
+ throw new SSTableAcquisitionException(message);
+ }
+
// exclude sstables pending repair, but record session ids for
// non-finalized sessions for a later error message
if (metadata.pendingRepair != NO_PENDING_REPAIR)
@@ -149,7 +158,7 @@ public class PendingAntiCompaction
}
}
- static class AcquisitionCallable implements Callable<AcquireResult>
+ public static class AcquisitionCallable implements Callable<AcquireResult>
{
private final ColumnFamilyStore cfs;
private final UUID sessionID;
@@ -157,7 +166,8 @@ public class PendingAntiCompaction
private final int acquireRetrySeconds;
private final int acquireSleepMillis;
- AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis)
+ @VisibleForTesting
+ public AcquisitionCallable(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID sessionID, int acquireRetrySeconds, int acquireSleepMillis)
{
this(cfs, sessionID, acquireRetrySeconds, acquireSleepMillis, new AntiCompactionPredicate(ranges, sessionID));
}
diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
index de5ac52..7a18133 100644
--- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
+++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
@@ -49,6 +50,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.AbstractCompactionTask;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.Verifier;
+import org.apache.cassandra.db.repair.PendingAntiCompaction;
import org.apache.cassandra.db.streaming.CassandraOutgoingFile;
import org.apache.cassandra.db.ReadExecutionController;
import org.apache.cassandra.db.SinglePartitionReadCommand;
@@ -76,6 +78,7 @@ import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamOperation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR;
import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE;
@@ -214,6 +217,55 @@ public class LegacySSTableTest
}
@Test
+ public void testMutateMetadataCSM() throws Exception
+ {
+ // we need to make sure we write old version metadata in the format for that version
+ for (String legacyVersion : legacyVersions)
+ {
+ // Skip 2.0.1 sstables as it doesn't have repaired information
+ if (legacyVersion.equals("jb"))
+ continue;
+ truncateTables(legacyVersion);
+ loadLegacyTables(legacyVersion);
+
+ for (ColumnFamilyStore cfs : Keyspace.open("legacy_tables").getColumnFamilyStores())
+ {
+ // set pending
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ UUID random = UUID.randomUUID();
+ try
+ {
+ cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, random, false);
+ if (!sstable.descriptor.version.hasPendingRepair())
+ fail("We should fail setting pending repair on unsupported sstables "+sstable);
+ }
+ catch (IllegalStateException e)
+ {
+ if (sstable.descriptor.version.hasPendingRepair())
+ fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable);
+ }
+ }
+ // set transient
+ for (SSTableReader sstable : cfs.getLiveSSTables())
+ {
+ try
+ {
+ cfs.getCompactionStrategyManager().mutateRepaired(Collections.singleton(sstable), UNREPAIRED_SSTABLE, UUID.randomUUID(), true);
+ if (!sstable.descriptor.version.hasIsTransient())
+ fail("We should fail setting pending repair on unsupported sstables "+sstable);
+ }
+ catch (IllegalStateException e)
+ {
+ if (sstable.descriptor.version.hasIsTransient())
+ fail("We should succeed setting pending repair on "+legacyVersion + " sstables, failed on "+sstable);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
public void testMutateLevel() throws Exception
{
// we need to make sure we write old version metadata in the format for that version
@@ -317,6 +369,25 @@ public class LegacySSTableTest
}
@Test
+ public void testPendingAntiCompactionOldSSTables() throws Exception
+ {
+ for (String legacyVersion : legacyVersions)
+ {
+ ColumnFamilyStore cfs = Keyspace.open("legacy_tables").getColumnFamilyStore(String.format("legacy_%s_simple", legacyVersion));
+ loadLegacyTable("legacy_%s_simple", legacyVersion);
+
+ boolean shouldFail = !cfs.getLiveSSTables().stream().allMatch(sstable -> sstable.descriptor.version.hasPendingRepair());
+ IPartitioner p = Iterables.getFirst(cfs.getLiveSSTables(), null).getPartitioner();
+ Range<Token> r = new Range<>(p.getMinimumToken(), p.getMinimumToken());
+ PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, Collections.singleton(r), UUIDGen.getTimeUUID(), 0, 0);
+ PendingAntiCompaction.AcquireResult res = acquisitionCallable.call();
+ assertEquals(shouldFail, res == null);
+ if (res != null)
+ res.abort();
+ }
+ }
+
+ @Test
public void testAutomaticUpgrade() throws Exception
{
for (String legacyVersion : legacyVersions)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org