You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/01/18 08:33:01 UTC
[cassandra] branch trunk updated: Too defensive check when picking
sstables for preview repair
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0e059c4 Too defensive check when picking sstables for preview repair
0e059c4 is described below
commit 0e059c42f298686d6a219b4bc0b7973652f4183d
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Nov 18 15:18:33 2020 +0100
Too defensive check when picking sstables for preview repair
Patch by marcuse; reviewed by Adam Holmberg and Ekaterina Dimitrova for CASSANDRA-16284
---
CHANGES.txt | 1 +
.../db/repair/CassandraValidationIterator.java | 4 +-
.../distributed/test/PreviewRepairTest.java | 59 +++++++++++++++++++++-
3 files changed, 60 insertions(+), 4 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 47b47d8..426de4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
* Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
* Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
* SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362)
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index 9bddd86..6f2256f 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -139,11 +139,11 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
predicate = (s) -> !prs.isIncremental || !s.isRepaired();
}
- try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate)))
+ try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)))
{
for (SSTableReader sstable : sstableCandidates.sstables)
{
- if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
+ if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges) && predicate.apply(sstable))
{
sstablesToValidate.add(sstable);
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index bc9eda7..1ad1ba6 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.shared.RepairResult;
import org.apache.cassandra.net.Message;
@@ -143,7 +144,6 @@ public class PreviewRepairTest extends TestBaseImpl
try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
-
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
cluster.get(1).callOnInstance(repair(options(false, false)));
@@ -281,6 +281,62 @@ public class PreviewRepairTest extends TestBaseImpl
}
}
+ /**
+ * Makes sure we can start a non-intersecting preview repair while there are other pending sstables on disk
+ */
+ @Test
+ public void testStartNonIntersectingPreviewRepair() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ try(Cluster cluster = init(Cluster.build(2).withConfig(config ->
+ config.with(GOSSIP)
+ .with(NETWORK))
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ cluster.get(1).nodetoolResult("repair", KEYSPACE, "tbl").asserts().success();
+
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ // pause inc repair validation messages on node2 until node1 has finished
+ SimpleCondition incRepairStarted = new SimpleCondition();
+ SimpleCondition continueIncRepair = new SimpleCondition();
+
+ DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(incRepairStarted, continueIncRepair);
+ cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+ // get local ranges to repair two separate ranges:
+ List<String> localRanges = cluster.get(1).callOnInstance(() -> {
+ List<String> res = new ArrayList<>();
+ for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
+ res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
+ return res;
+ });
+
+ assertEquals(2, localRanges.size());
+ String [] previewedRange = localRanges.get(0).split(":");
+ String [] repairedRange = localRanges.get(1).split(":");
+ Future<NodeToolResult> repairStatusFuture = es.submit(() -> cluster.get(1).nodetoolResult("repair", "-st", repairedRange[0], "-et", repairedRange[1], KEYSPACE, "tbl"));
+ incRepairStarted.await(); // wait for node1 to start validation compaction
+ // now we have pending sstables in range "repairedRange", make sure we can preview "previewedRange"
+ cluster.get(1).nodetoolResult("repair", "-vd", "-st", previewedRange[0], "-et", previewedRange[1], KEYSPACE, "tbl")
+ .asserts()
+ .success()
+ .notificationContains("Repaired data is in sync");
+
+ continueIncRepair.signalAll();
+
+ repairStatusFuture.get().asserts().success();
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
@Test
public void snapshotTest() throws IOException, InterruptedException
{
@@ -292,7 +348,6 @@ public class PreviewRepairTest extends TestBaseImpl
{
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
cluster.schemaChange("create table " + KEYSPACE + ".tbl2 (id int primary key, t int)");
- Thread.sleep(1000);
// populate 2 tables
insert(cluster.coordinator(1), 0, 100, "tbl");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org