You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2021/01/18 08:33:01 UTC

[cassandra] branch trunk updated: Too defensive check when picking sstables for preview repair

This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0e059c4  Too defensive check when picking sstables for preview repair
0e059c4 is described below

commit 0e059c42f298686d6a219b4bc0b7973652f4183d
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Nov 18 15:18:33 2020 +0100

    Too defensive check when picking sstables for preview repair
    
    Patch by marcuse; reviewed by Adam Holmberg and Ekaterina Dimitrova for CASSANDRA-16284
---
 CHANGES.txt                                        |  1 +
 .../db/repair/CassandraValidationIterator.java     |  4 +-
 .../distributed/test/PreviewRepairTest.java        | 59 +++++++++++++++++++++-
 3 files changed, 60 insertions(+), 4 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 47b47d8..426de4c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Too defensive check when picking sstables for preview repair (CASSANDRA-16284)
  * Ensure pre-negotiation native protocol responses have correct stream id (CASSANDRA-16376)
  * Fix check for -Xlog in cassandra-env.sh (CASSANDRA-16279)
  * SSLFactory should initialize SSLContext before setting protocols (CASSANDRA-16362)
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index 9bddd86..6f2256f 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -139,11 +139,11 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
             predicate = (s) -> !prs.isIncremental || !s.isRepaired();
         }
 
-        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.select(SSTableSet.CANONICAL, predicate)))
+        try (ColumnFamilyStore.RefViewFragment sstableCandidates = cfs.selectAndReference(View.selectFunction(SSTableSet.CANONICAL)))
         {
             for (SSTableReader sstable : sstableCandidates.sstables)
             {
-                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
+                if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges) && predicate.apply(sstable))
                 {
                     sstablesToValidate.add(sstable);
                 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index bc9eda7..1ad1ba6 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -50,6 +50,7 @@ import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.distributed.shared.RepairResult;
 import org.apache.cassandra.net.Message;
@@ -143,7 +144,6 @@ public class PreviewRepairTest extends TestBaseImpl
         try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
         {
             cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
-
             insert(cluster.coordinator(1), 0, 100);
             cluster.forEach((node) -> node.flush(KEYSPACE));
             cluster.get(1).callOnInstance(repair(options(false, false)));
@@ -281,6 +281,62 @@ public class PreviewRepairTest extends TestBaseImpl
         }
     }
 
+    /**
+     * Makes sure we can start a non-intersecting preview repair while there are other pending sstables on disk
+     */
+    @Test
+    public void testStartNonIntersectingPreviewRepair() throws IOException, InterruptedException, ExecutionException
+    {
+        ExecutorService es = Executors.newSingleThreadExecutor();
+        try(Cluster cluster = init(Cluster.build(2).withConfig(config ->
+                                                               config.with(GOSSIP)
+                                                                     .with(NETWORK))
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+            insert(cluster.coordinator(1), 0, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+            cluster.get(1).nodetoolResult("repair", KEYSPACE, "tbl").asserts().success();
+
+            insert(cluster.coordinator(1), 100, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+
+            // pause inc repair validation messages on node2 until node1 has finished
+            SimpleCondition incRepairStarted = new SimpleCondition();
+            SimpleCondition continueIncRepair = new SimpleCondition();
+
+            DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(incRepairStarted, continueIncRepair);
+            cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+            // get local ranges to repair two separate ranges:
+            List<String> localRanges = cluster.get(1).callOnInstance(() -> {
+                List<String> res = new ArrayList<>();
+                for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
+                    res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
+                return res;
+            });
+
+            assertEquals(2, localRanges.size());
+            String [] previewedRange = localRanges.get(0).split(":");
+            String [] repairedRange = localRanges.get(1).split(":");
+            Future<NodeToolResult> repairStatusFuture = es.submit(() -> cluster.get(1).nodetoolResult("repair", "-st", repairedRange[0], "-et", repairedRange[1], KEYSPACE, "tbl"));
+            incRepairStarted.await(); // wait for node1 to start validation compaction
+            // now we have pending sstables in range "repairedRange", make sure we can preview "previewedRange"
+            cluster.get(1).nodetoolResult("repair", "-vd", "-st", previewedRange[0], "-et", previewedRange[1], KEYSPACE, "tbl")
+                          .asserts()
+                          .success()
+                          .notificationContains("Repaired data is in sync");
+
+            continueIncRepair.signalAll();
+
+            repairStatusFuture.get().asserts().success();
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
     @Test
     public void snapshotTest() throws IOException, InterruptedException
     {
@@ -292,7 +348,6 @@ public class PreviewRepairTest extends TestBaseImpl
         {
             cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
             cluster.schemaChange("create table " + KEYSPACE + ".tbl2 (id int primary key, t int)");
-            Thread.sleep(1000);
 
             // populate 2 tables
             insert(cluster.coordinator(1), 0, 100, "tbl");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org