You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/10/20 23:04:39 UTC

[cassandra] branch trunk updated: Log Warning Rather than Verbose Trace when Preview Repair Validation Conflicts with Incremental Repair

This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2edc5bb  Log Warning Rather than Verbose Trace when Preview Repair Validation Conflicts with Incremental Repair
2edc5bb is described below

commit 2edc5bb441eb7e3ccd549333012ef00fd1d5c428
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Oct 20 14:58:57 2020 -0700

    Log Warning Rather than Verbose Trace when Preview Repair Validation Conflicts with Incremental Repair
    
    patch by Caleb Rackliffe; reviewed by David Capwell, Marcus Eriksson for CASSANDRA-16209
---
 ...pairConflictWithIncrementalRepairException.java |  27 ++++
 .../apache/cassandra/repair/ValidationManager.java |   5 +
 .../apache/cassandra/streaming/PreviewKind.java    |   3 +-
 .../utils/concurrent/SimpleCondition.java          |   7 +-
 .../distributed/test/PreviewRepairTest.java        | 169 +++++++++++++++++----
 5 files changed, 180 insertions(+), 31 deletions(-)

diff --git a/src/java/org/apache/cassandra/repair/PreviewRepairConflictWithIncrementalRepairException.java b/src/java/org/apache/cassandra/repair/PreviewRepairConflictWithIncrementalRepairException.java
new file mode 100644
index 0000000..09be1a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/PreviewRepairConflictWithIncrementalRepairException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+public class PreviewRepairConflictWithIncrementalRepairException extends IllegalStateException
+{
+    public PreviewRepairConflictWithIncrementalRepairException(String s)
+    {
+        super(s);
+    }
+}
diff --git a/src/java/org/apache/cassandra/repair/ValidationManager.java b/src/java/org/apache/cassandra/repair/ValidationManager.java
index 4bbffbf..eb6ec96 100644
--- a/src/java/org/apache/cassandra/repair/ValidationManager.java
+++ b/src/java/org/apache/cassandra/repair/ValidationManager.java
@@ -161,6 +161,11 @@ public class ValidationManager
                 {
                     doValidation(cfs, validator);
                 }
+                catch (PreviewRepairConflictWithIncrementalRepairException e)
+                {
+                    validator.fail();
+                    logger.warn(e.getMessage());
+                }
                 catch (Throwable e)
                 {
                     // we need to inform the remote end of our failure, otherwise it will hang on repair forever
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java
index b5467de..3d0bff9 100644
--- a/src/java/org/apache/cassandra/streaming/PreviewKind.java
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -25,6 +25,7 @@ import com.google.common.base.Predicates;
 
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.repair.PreviewRepairConflictWithIncrementalRepairException;
 import org.apache.cassandra.repair.consistent.ConsistentSession;
 import org.apache.cassandra.repair.consistent.LocalSession;
 import org.apache.cassandra.service.ActiveRepairService;
@@ -92,7 +93,7 @@ public enum PreviewKind
                 else if (session.getState() == ConsistentSession.State.FINALIZED)
                     return true;
                 else if (session.getState() != ConsistentSession.State.FAILED)
-                    throw new IllegalStateException(String.format("SSTable %s is marked pending for non-finalized incremental repair session %s, failing preview repair", sstable, sstableMetadata.pendingRepair));
+                    throw new PreviewRepairConflictWithIncrementalRepairException(String.format("SSTable %s is marked pending for non-finalized incremental repair session %s, failing preview repair", sstable, sstableMetadata.pendingRepair));
             }
             return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
         }
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
index 0ff9018..61ec640 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
@@ -32,6 +32,7 @@ public class SimpleCondition implements Condition
     private volatile WaitQueue waiting;
     private volatile boolean signaled = false;
 
+    @Override
     public void await() throws InterruptedException
     {
         if (isSignaled())
@@ -90,12 +91,14 @@ public class SimpleCondition implements Condition
         throw new UnsupportedOperationException();
     }
 
-    public long awaitNanos(long nanosTimeout) throws InterruptedException
+    @Override
+    public long awaitNanos(long nanosTimeout)
     {
         throw new UnsupportedOperationException();
     }
 
-    public boolean awaitUntil(Date deadline) throws InterruptedException
+    @Override
+    public boolean awaitUntil(Date deadline)
     {
         throw new UnsupportedOperationException();
     }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 18e6d43..87837f2 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -34,8 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.compaction.CompactionManager;
@@ -48,11 +50,16 @@ import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 import org.apache.cassandra.distributed.api.IMessage;
 import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.impl.Instance;
 import org.apache.cassandra.distributed.shared.RepairResult;
+import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
 import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.repair.messages.ValidationRequest;
 import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.PreviewKind;
@@ -68,6 +75,12 @@ import static org.junit.Assert.assertTrue;
 
 public class PreviewRepairTest extends TestBaseImpl
 {
+    @BeforeClass
+    public static void setup()
+    {
+        DatabaseDescriptor.daemonInitialization();
+    }
+    
     /**
      * makes sure that the repaired sstables are not matching on the two
      * nodes by disabling autocompaction on node2 and then running an
@@ -81,7 +94,7 @@ public class PreviewRepairTest extends TestBaseImpl
             cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
             insert(cluster.coordinator(1), 0, 100);
             cluster.forEach((node) -> node.flush(KEYSPACE));
-            cluster.get(1).callOnInstance(repair(options(false)));
+            cluster.get(1).callOnInstance(repair(options(false, false)));
             insert(cluster.coordinator(1), 100, 100);
             cluster.forEach((node) -> node.flush(KEYSPACE));
 
@@ -92,7 +105,7 @@ public class PreviewRepairTest extends TestBaseImpl
                 FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
                 cfs.disableAutoCompaction();
             }));
-            cluster.get(1).callOnInstance(repair(options(false)));
+            cluster.get(1).callOnInstance(repair(options(false, false)));
             // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
             cluster.get(1).runOnInstance(() -> {
                 ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
@@ -103,7 +116,7 @@ public class PreviewRepairTest extends TestBaseImpl
             //IR and Preview repair can't run concurrently. In case the test is flaky, please check CASSANDRA-15685
             Thread.sleep(1000);
 
-            RepairResult rs = cluster.get(1).callOnInstance(repair(options(true)));
+            RepairResult rs = cluster.get(1).callOnInstance(repair(options(true, false)));
             assertTrue(rs.success); // preview repair should succeed
             assertFalse(rs.wasInconsistent); // and we should see no mismatches
         }
@@ -133,20 +146,21 @@ public class PreviewRepairTest extends TestBaseImpl
 
             insert(cluster.coordinator(1), 0, 100);
             cluster.forEach((node) -> node.flush(KEYSPACE));
-            cluster.get(1).callOnInstance(repair(options(false)));
+            cluster.get(1).callOnInstance(repair(options(false, false)));
 
             insert(cluster.coordinator(1), 100, 100);
             cluster.forEach((node) -> node.flush(KEYSPACE));
-
+            
+            SimpleCondition previewRepairStarted = new SimpleCondition();
             SimpleCondition continuePreviewRepair = new SimpleCondition();
-            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+            DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair);
             // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
             cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 
-            Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
-            Thread.sleep(1000);
+            Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, false))));
+            previewRepairStarted.await();
             // this needs to finish before the preview repair is unpaused on node2
-            cluster.get(1).callOnInstance(repair(options(false)));
+            cluster.get(1).callOnInstance(repair(options(false, false)));
             continuePreviewRepair.signalAll();
             RepairResult rs = rsFuture.get();
             assertFalse(rs.success); // preview repair should have failed
@@ -159,6 +173,66 @@ public class PreviewRepairTest extends TestBaseImpl
     }
 
     /**
+     * Tests that a IR is running, but not completed before validation compaction starts
+     */
+    @Test
+    public void testConcurrentIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
+    {
+        try (Cluster cluster = init(Cluster.build(2).withConfig(config ->
+                                                                config.set("disable_incremental_repair", false)
+                                                                      .with(GOSSIP)
+                                                                      .with(NETWORK)).start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+            insert(cluster.coordinator(1), 0, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+            cluster.get(1).callOnInstance(repair(options(false, false)));
+
+            insert(cluster.coordinator(1), 100, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+
+            SimpleCondition previewRepairStarted = new SimpleCondition();
+            SimpleCondition continuePreviewRepair = new SimpleCondition();
+            // this pauses the validation request sent from node1 to node2 until the inc repair below has run
+            cluster.filters()
+                   .outbound()
+                   .verbs(Verb.VALIDATION_REQ.id)
+                   .from(1).to(2)
+                   .messagesMatching(DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair))
+                   .drop();
+
+            SimpleCondition irRepairStarted = new SimpleCondition();
+            SimpleCondition continueIrRepair = new SimpleCondition();
+            // this blocks the IR from committing, so we can reenable the preview
+            cluster.filters()
+                   .outbound()
+                   .verbs(Verb.FINALIZE_PROPOSE_MSG.id)
+                   .from(1).to(2)
+                   .messagesMatching(DelayFirstRepairTypeMessageFilter.finalizePropose(irRepairStarted, continueIrRepair))
+                   .drop();
+
+            Future<RepairResult> previewResult = cluster.get(1).asyncCallsOnInstance(repair(options(true, false))).call();
+            previewRepairStarted.await();
+
+            // trigger IR and wait till its ready to commit
+            Future<RepairResult> irResult = cluster.get(1).asyncCallsOnInstance(repair(options(false, false))).call();
+            irRepairStarted.await();
+
+            // unblock preview repair and wait for it to complete
+            continuePreviewRepair.signalAll();
+
+            RepairResult rs = previewResult.get();
+            assertFalse(rs.success); // preview repair should have failed
+            assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
+
+            continueIrRepair.signalAll();
+            RepairResult ir = irResult.get();
+            assertTrue(ir.success);
+            assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification
+        }
+    }
+
+    /**
      * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
      * so both preview and incremental repair should finish fine (without any mismatches)
      */
@@ -172,14 +246,15 @@ public class PreviewRepairTest extends TestBaseImpl
 
             insert(cluster.coordinator(1), 0, 100);
             cluster.forEach((node) -> node.flush(KEYSPACE));
-            assertTrue(cluster.get(1).callOnInstance(repair(options(false))).success);
+            assertTrue(cluster.get(1).callOnInstance(repair(options(false, false))).success);
 
             insert(cluster.coordinator(1), 100, 100);
             cluster.forEach((node) -> node.flush(KEYSPACE));
 
             // pause preview repair validation messages on node2 until node1 has finished
+            SimpleCondition previewRepairStarted = new SimpleCondition();
             SimpleCondition continuePreviewRepair = new SimpleCondition();
-            DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+            DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair);
             cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
 
             // get local ranges to repair two separate ranges:
@@ -191,10 +266,10 @@ public class PreviewRepairTest extends TestBaseImpl
             });
 
             assertEquals(2, localRanges.size());
-            Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
-            Thread.sleep(1000); // wait for node1 to start validation compaction
+            Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, false, localRanges.get(0)))));
+            previewRepairStarted.await(); // wait for node1 to start validation compaction
             // this needs to finish before the preview repair is unpaused on node2
-            assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).success);
+            assertTrue(cluster.get(1).callOnInstance(repair(options(false, false, localRanges.get(1)))).success);
 
             continuePreviewRepair.signalAll();
             RepairResult rs = repairStatusFuture.get();
@@ -226,7 +301,7 @@ public class PreviewRepairTest extends TestBaseImpl
             cluster.forEach((n) -> n.flush(KEYSPACE));
 
             // make sure everything is marked repaired
-            cluster.get(1).callOnInstance(repair(options(false)));
+            cluster.get(1).callOnInstance(repair(options(false, false)));
             waitMarkedRepaired(cluster);
             // make node2 mismatch
             unmarkRepaired(cluster.get(2), "tbl");
@@ -238,7 +313,7 @@ public class PreviewRepairTest extends TestBaseImpl
                 snapshotMessageCounter.incrementAndGet();
                 return false;
             }).drop();
-            cluster.get(1).callOnInstance(repair(options(true)));
+            cluster.get(1).callOnInstance(repair(options(true, true)));
             verifySnapshots(cluster, "tbl", false);
             // tbl2 should not have a mismatch, so the snapshots should be empty here
             verifySnapshots(cluster, "tbl2", true);
@@ -246,7 +321,7 @@ public class PreviewRepairTest extends TestBaseImpl
 
             // and make sure that we don't try to snapshot again
             snapshotMessageCounter.set(0);
-            cluster.get(3).callOnInstance(repair(options(true)));
+            cluster.get(3).callOnInstance(repair(options(true, true)));
             assertEquals(0, snapshotMessageCounter.get());
         }
     }
@@ -298,22 +373,32 @@ public class PreviewRepairTest extends TestBaseImpl
         }));
     }
 
-    static class DelayMessageFilter implements IMessageFilters.Matcher
+    static abstract class DelayFirstRepairMessageFilter implements IMessageFilters.Matcher
     {
-        private final SimpleCondition condition;
+        private final SimpleCondition pause;
+        private final SimpleCondition resume;
         private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
 
-        public DelayMessageFilter(SimpleCondition condition)
+        protected DelayFirstRepairMessageFilter(SimpleCondition pause, SimpleCondition resume)
         {
-            this.condition = condition;
+            this.pause = pause;
+            this.resume = resume;
         }
-        public boolean matches(int from, int to, IMessage message)
+
+        protected abstract boolean matchesMessage(RepairMessage message);
+
+        public final boolean matches(int from, int to, IMessage message)
         {
             try
             {
-                // only the first validation req should be delayed:
-                if (waitForRepair.compareAndSet(true, false))
-                    condition.await();
+                Message<?> msg = Instance.deserializeMessage(message);
+                RepairMessage repairMessage = (RepairMessage) msg.payload;
+                // only the first message should be delayed:
+                if (matchesMessage(repairMessage) && waitForRepair.compareAndSet(true, false))
+                {
+                    pause.signalAll();
+                    resume.await();
+                }
             }
             catch (Exception e)
             {
@@ -323,6 +408,32 @@ public class PreviewRepairTest extends TestBaseImpl
         }
     }
 
+    static class DelayFirstRepairTypeMessageFilter extends DelayFirstRepairMessageFilter
+    {
+        private final Class<? extends RepairMessage> type;
+
+        public DelayFirstRepairTypeMessageFilter(SimpleCondition pause, SimpleCondition resume, Class<? extends RepairMessage> type)
+        {
+            super(pause, resume);
+            this.type = type;
+        }
+
+        public static DelayFirstRepairTypeMessageFilter validationRequest(SimpleCondition pause, SimpleCondition resume)
+        {
+            return new DelayFirstRepairTypeMessageFilter(pause, resume, ValidationRequest.class);
+        }
+
+        public static DelayFirstRepairTypeMessageFilter finalizePropose(SimpleCondition pause, SimpleCondition resume)
+        {
+            return new DelayFirstRepairTypeMessageFilter(pause, resume, FinalizePropose.class);
+        }
+
+        protected boolean matchesMessage(RepairMessage repairMessage)
+        {
+            return repairMessage.getClass() == type;
+        }
+    }
+
     static void insert(ICoordinator coordinator, int start, int count)
     {
         insert(coordinator, start, count, "tbl");
@@ -368,19 +479,21 @@ public class PreviewRepairTest extends TestBaseImpl
         };
     }
 
-    private static Map<String, String> options(boolean preview)
+    private static Map<String, String> options(boolean preview, boolean full)
     {
         Map<String, String> config = new HashMap<>();
         config.put(RepairOption.INCREMENTAL_KEY, "true");
         config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
         if (preview)
             config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
+        if (full)
+            config.put(RepairOption.INCREMENTAL_KEY, "false");
         return config;
     }
 
-    private static Map<String, String> options(boolean preview, String range)
+    private static Map<String, String> options(boolean preview, boolean full, String range)
     {
-        Map<String, String> options = options(preview);
+        Map<String, String> options = options(preview, full);
         options.put(RepairOption.RANGES_KEY, range);
         return options;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org