You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by dc...@apache.org on 2020/10/20 23:04:39 UTC
[cassandra] branch trunk updated: Log Warning Rather than Verbose
Trace when Preview Repair Validation Conflicts with Incremental Repair
This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2edc5bb Log Warning Rather than Verbose Trace when Preview Repair Validation Conflicts with Incremental Repair
2edc5bb is described below
commit 2edc5bb441eb7e3ccd549333012ef00fd1d5c428
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Tue Oct 20 14:58:57 2020 -0700
Log Warning Rather than Verbose Trace when Preview Repair Validation Conflicts with Incremental Repair
patch by Caleb Rackliffe; reviewed by David Capwell, Marcus Eriksson for CASSANDRA-16209
---
...pairConflictWithIncrementalRepairException.java | 27 ++++
.../apache/cassandra/repair/ValidationManager.java | 5 +
.../apache/cassandra/streaming/PreviewKind.java | 3 +-
.../utils/concurrent/SimpleCondition.java | 7 +-
.../distributed/test/PreviewRepairTest.java | 169 +++++++++++++++++----
5 files changed, 180 insertions(+), 31 deletions(-)
diff --git a/src/java/org/apache/cassandra/repair/PreviewRepairConflictWithIncrementalRepairException.java b/src/java/org/apache/cassandra/repair/PreviewRepairConflictWithIncrementalRepairException.java
new file mode 100644
index 0000000..09be1a4
--- /dev/null
+++ b/src/java/org/apache/cassandra/repair/PreviewRepairConflictWithIncrementalRepairException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.repair;
+
+public class PreviewRepairConflictWithIncrementalRepairException extends IllegalStateException
+{
+ public PreviewRepairConflictWithIncrementalRepairException(String s)
+ {
+ super(s);
+ }
+}
diff --git a/src/java/org/apache/cassandra/repair/ValidationManager.java b/src/java/org/apache/cassandra/repair/ValidationManager.java
index 4bbffbf..eb6ec96 100644
--- a/src/java/org/apache/cassandra/repair/ValidationManager.java
+++ b/src/java/org/apache/cassandra/repair/ValidationManager.java
@@ -161,6 +161,11 @@ public class ValidationManager
{
doValidation(cfs, validator);
}
+ catch (PreviewRepairConflictWithIncrementalRepairException e)
+ {
+ validator.fail();
+ logger.warn(e.getMessage());
+ }
catch (Throwable e)
{
// we need to inform the remote end of our failure, otherwise it will hang on repair forever
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java
index b5467de..3d0bff9 100644
--- a/src/java/org/apache/cassandra/streaming/PreviewKind.java
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -25,6 +25,7 @@ import com.google.common.base.Predicates;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.repair.PreviewRepairConflictWithIncrementalRepairException;
import org.apache.cassandra.repair.consistent.ConsistentSession;
import org.apache.cassandra.repair.consistent.LocalSession;
import org.apache.cassandra.service.ActiveRepairService;
@@ -92,7 +93,7 @@ public enum PreviewKind
else if (session.getState() == ConsistentSession.State.FINALIZED)
return true;
else if (session.getState() != ConsistentSession.State.FAILED)
- throw new IllegalStateException(String.format("SSTable %s is marked pending for non-finalized incremental repair session %s, failing preview repair", sstable, sstableMetadata.pendingRepair));
+ throw new PreviewRepairConflictWithIncrementalRepairException(String.format("SSTable %s is marked pending for non-finalized incremental repair session %s, failing preview repair", sstable, sstableMetadata.pendingRepair));
}
return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
}
diff --git a/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
index 0ff9018..61ec640 100644
--- a/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
+++ b/src/java/org/apache/cassandra/utils/concurrent/SimpleCondition.java
@@ -32,6 +32,7 @@ public class SimpleCondition implements Condition
private volatile WaitQueue waiting;
private volatile boolean signaled = false;
+ @Override
public void await() throws InterruptedException
{
if (isSignaled())
@@ -90,12 +91,14 @@ public class SimpleCondition implements Condition
throw new UnsupportedOperationException();
}
- public long awaitNanos(long nanosTimeout) throws InterruptedException
+ @Override
+ public long awaitNanos(long nanosTimeout)
{
throw new UnsupportedOperationException();
}
- public boolean awaitUntil(Date deadline) throws InterruptedException
+ @Override
+ public boolean awaitUntil(Date deadline)
{
throw new UnsupportedOperationException();
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 18e6d43..87837f2 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -34,8 +34,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.BeforeClass;
import org.junit.Test;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.compaction.CompactionManager;
@@ -48,11 +50,16 @@ import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.IMessage;
import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.impl.Instance;
import org.apache.cassandra.distributed.shared.RepairResult;
+import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.Verb;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.repair.messages.FinalizePropose;
+import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.repair.messages.ValidationRequest;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.PreviewKind;
@@ -68,6 +75,12 @@ import static org.junit.Assert.assertTrue;
public class PreviewRepairTest extends TestBaseImpl
{
+ @BeforeClass
+ public static void setup()
+ {
+ DatabaseDescriptor.daemonInitialization();
+ }
+
/**
* makes sure that the repaired sstables are not matching on the two
* nodes by disabling autocompaction on node2 and then running an
@@ -81,7 +94,7 @@ public class PreviewRepairTest extends TestBaseImpl
cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
- cluster.get(1).callOnInstance(repair(options(false)));
+ cluster.get(1).callOnInstance(repair(options(false, false)));
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
@@ -92,7 +105,7 @@ public class PreviewRepairTest extends TestBaseImpl
FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
cfs.disableAutoCompaction();
}));
- cluster.get(1).callOnInstance(repair(options(false)));
+ cluster.get(1).callOnInstance(repair(options(false, false)));
// now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
cluster.get(1).runOnInstance(() -> {
ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
@@ -103,7 +116,7 @@ public class PreviewRepairTest extends TestBaseImpl
//IR and Preview repair can't run concurrently. In case the test is flaky, please check CASSANDRA-15685
Thread.sleep(1000);
- RepairResult rs = cluster.get(1).callOnInstance(repair(options(true)));
+ RepairResult rs = cluster.get(1).callOnInstance(repair(options(true, false)));
assertTrue(rs.success); // preview repair should succeed
assertFalse(rs.wasInconsistent); // and we should see no mismatches
}
@@ -133,20 +146,21 @@ public class PreviewRepairTest extends TestBaseImpl
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
- cluster.get(1).callOnInstance(repair(options(false)));
+ cluster.get(1).callOnInstance(repair(options(false, false)));
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
-
+
+ SimpleCondition previewRepairStarted = new SimpleCondition();
SimpleCondition continuePreviewRepair = new SimpleCondition();
- DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+ DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair);
// this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
- Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
- Thread.sleep(1000);
+ Future<RepairResult> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, false))));
+ previewRepairStarted.await();
// this needs to finish before the preview repair is unpaused on node2
- cluster.get(1).callOnInstance(repair(options(false)));
+ cluster.get(1).callOnInstance(repair(options(false, false)));
continuePreviewRepair.signalAll();
RepairResult rs = rsFuture.get();
assertFalse(rs.success); // preview repair should have failed
@@ -159,6 +173,66 @@ public class PreviewRepairTest extends TestBaseImpl
}
/**
+ * Tests that a IR is running, but not completed before validation compaction starts
+ */
+ @Test
+ public void testConcurrentIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
+ {
+ try (Cluster cluster = init(Cluster.build(2).withConfig(config ->
+ config.set("disable_incremental_repair", false)
+ .with(GOSSIP)
+ .with(NETWORK)).start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ cluster.get(1).callOnInstance(repair(options(false, false)));
+
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ SimpleCondition previewRepairStarted = new SimpleCondition();
+ SimpleCondition continuePreviewRepair = new SimpleCondition();
+ // this pauses the validation request sent from node1 to node2 until the inc repair below has run
+ cluster.filters()
+ .outbound()
+ .verbs(Verb.VALIDATION_REQ.id)
+ .from(1).to(2)
+ .messagesMatching(DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair))
+ .drop();
+
+ SimpleCondition irRepairStarted = new SimpleCondition();
+ SimpleCondition continueIrRepair = new SimpleCondition();
+ // this blocks the IR from committing, so we can reenable the preview
+ cluster.filters()
+ .outbound()
+ .verbs(Verb.FINALIZE_PROPOSE_MSG.id)
+ .from(1).to(2)
+ .messagesMatching(DelayFirstRepairTypeMessageFilter.finalizePropose(irRepairStarted, continueIrRepair))
+ .drop();
+
+ Future<RepairResult> previewResult = cluster.get(1).asyncCallsOnInstance(repair(options(true, false))).call();
+ previewRepairStarted.await();
+
+ // trigger IR and wait till its ready to commit
+ Future<RepairResult> irResult = cluster.get(1).asyncCallsOnInstance(repair(options(false, false))).call();
+ irRepairStarted.await();
+
+ // unblock preview repair and wait for it to complete
+ continuePreviewRepair.signalAll();
+
+ RepairResult rs = previewResult.get();
+ assertFalse(rs.success); // preview repair should have failed
+ assertFalse(rs.wasInconsistent); // and no mismatches should have been reported
+
+ continueIrRepair.signalAll();
+ RepairResult ir = irResult.get();
+ assertTrue(ir.success);
+ assertFalse(ir.wasInconsistent); // not preview, so we don't care about preview notification
+ }
+ }
+
+ /**
* Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
* so both preview and incremental repair should finish fine (without any mismatches)
*/
@@ -172,14 +246,15 @@ public class PreviewRepairTest extends TestBaseImpl
insert(cluster.coordinator(1), 0, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
- assertTrue(cluster.get(1).callOnInstance(repair(options(false))).success);
+ assertTrue(cluster.get(1).callOnInstance(repair(options(false, false))).success);
insert(cluster.coordinator(1), 100, 100);
cluster.forEach((node) -> node.flush(KEYSPACE));
// pause preview repair validation messages on node2 until node1 has finished
+ SimpleCondition previewRepairStarted = new SimpleCondition();
SimpleCondition continuePreviewRepair = new SimpleCondition();
- DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+ DelayFirstRepairTypeMessageFilter filter = DelayFirstRepairTypeMessageFilter.validationRequest(previewRepairStarted, continuePreviewRepair);
cluster.filters().outbound().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
// get local ranges to repair two separate ranges:
@@ -191,10 +266,10 @@ public class PreviewRepairTest extends TestBaseImpl
});
assertEquals(2, localRanges.size());
- Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
- Thread.sleep(1000); // wait for node1 to start validation compaction
+ Future<RepairResult> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, false, localRanges.get(0)))));
+ previewRepairStarted.await(); // wait for node1 to start validation compaction
// this needs to finish before the preview repair is unpaused on node2
- assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).success);
+ assertTrue(cluster.get(1).callOnInstance(repair(options(false, false, localRanges.get(1)))).success);
continuePreviewRepair.signalAll();
RepairResult rs = repairStatusFuture.get();
@@ -226,7 +301,7 @@ public class PreviewRepairTest extends TestBaseImpl
cluster.forEach((n) -> n.flush(KEYSPACE));
// make sure everything is marked repaired
- cluster.get(1).callOnInstance(repair(options(false)));
+ cluster.get(1).callOnInstance(repair(options(false, false)));
waitMarkedRepaired(cluster);
// make node2 mismatch
unmarkRepaired(cluster.get(2), "tbl");
@@ -238,7 +313,7 @@ public class PreviewRepairTest extends TestBaseImpl
snapshotMessageCounter.incrementAndGet();
return false;
}).drop();
- cluster.get(1).callOnInstance(repair(options(true)));
+ cluster.get(1).callOnInstance(repair(options(true, true)));
verifySnapshots(cluster, "tbl", false);
// tbl2 should not have a mismatch, so the snapshots should be empty here
verifySnapshots(cluster, "tbl2", true);
@@ -246,7 +321,7 @@ public class PreviewRepairTest extends TestBaseImpl
// and make sure that we don't try to snapshot again
snapshotMessageCounter.set(0);
- cluster.get(3).callOnInstance(repair(options(true)));
+ cluster.get(3).callOnInstance(repair(options(true, true)));
assertEquals(0, snapshotMessageCounter.get());
}
}
@@ -298,22 +373,32 @@ public class PreviewRepairTest extends TestBaseImpl
}));
}
- static class DelayMessageFilter implements IMessageFilters.Matcher
+ static abstract class DelayFirstRepairMessageFilter implements IMessageFilters.Matcher
{
- private final SimpleCondition condition;
+ private final SimpleCondition pause;
+ private final SimpleCondition resume;
private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
- public DelayMessageFilter(SimpleCondition condition)
+ protected DelayFirstRepairMessageFilter(SimpleCondition pause, SimpleCondition resume)
{
- this.condition = condition;
+ this.pause = pause;
+ this.resume = resume;
}
- public boolean matches(int from, int to, IMessage message)
+
+ protected abstract boolean matchesMessage(RepairMessage message);
+
+ public final boolean matches(int from, int to, IMessage message)
{
try
{
- // only the first validation req should be delayed:
- if (waitForRepair.compareAndSet(true, false))
- condition.await();
+ Message<?> msg = Instance.deserializeMessage(message);
+ RepairMessage repairMessage = (RepairMessage) msg.payload;
+ // only the first message should be delayed:
+ if (matchesMessage(repairMessage) && waitForRepair.compareAndSet(true, false))
+ {
+ pause.signalAll();
+ resume.await();
+ }
}
catch (Exception e)
{
@@ -323,6 +408,32 @@ public class PreviewRepairTest extends TestBaseImpl
}
}
+ static class DelayFirstRepairTypeMessageFilter extends DelayFirstRepairMessageFilter
+ {
+ private final Class<? extends RepairMessage> type;
+
+ public DelayFirstRepairTypeMessageFilter(SimpleCondition pause, SimpleCondition resume, Class<? extends RepairMessage> type)
+ {
+ super(pause, resume);
+ this.type = type;
+ }
+
+ public static DelayFirstRepairTypeMessageFilter validationRequest(SimpleCondition pause, SimpleCondition resume)
+ {
+ return new DelayFirstRepairTypeMessageFilter(pause, resume, ValidationRequest.class);
+ }
+
+ public static DelayFirstRepairTypeMessageFilter finalizePropose(SimpleCondition pause, SimpleCondition resume)
+ {
+ return new DelayFirstRepairTypeMessageFilter(pause, resume, FinalizePropose.class);
+ }
+
+ protected boolean matchesMessage(RepairMessage repairMessage)
+ {
+ return repairMessage.getClass() == type;
+ }
+ }
+
static void insert(ICoordinator coordinator, int start, int count)
{
insert(coordinator, start, count, "tbl");
@@ -368,19 +479,21 @@ public class PreviewRepairTest extends TestBaseImpl
};
}
- private static Map<String, String> options(boolean preview)
+ private static Map<String, String> options(boolean preview, boolean full)
{
Map<String, String> config = new HashMap<>();
config.put(RepairOption.INCREMENTAL_KEY, "true");
config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
if (preview)
config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
+ if (full)
+ config.put(RepairOption.INCREMENTAL_KEY, "false");
return config;
}
- private static Map<String, String> options(boolean preview, String range)
+ private static Map<String, String> options(boolean preview, boolean full, String range)
{
- Map<String, String> options = options(preview);
+ Map<String, String> options = options(preview, full);
options.put(RepairOption.RANGES_KEY, range);
return options;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org