You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2020/03/02 08:29:59 UTC
[cassandra] branch trunk updated: Include finalized pending
sstables in preview repair
This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 219d209 Include finalized pending sstables in preview repair
219d209 is described below
commit 219d209651759cf702519a100c4f43595f7be8d7
Author: Marcus Eriksson <ma...@apache.org>
AuthorDate: Wed Feb 5 12:51:47 2020 +0100
Include finalized pending sstables in preview repair
Patch by marcuse; reviewed by Blake Eggleston and David Capwell for CASSANDRA-15553
---
CHANGES.txt | 1 +
.../db/compaction/PendingRepairManager.java | 2 +-
.../db/repair/CassandraValidationIterator.java | 24 +-
.../db/streaming/CassandraStreamManager.java | 17 +-
.../org/apache/cassandra/repair/RepairSession.java | 43 +++-
.../cassandra/repair/consistent/LocalSessions.java | 23 +-
.../cassandra/service/ActiveRepairService.java | 5 +-
.../apache/cassandra/streaming/PreviewKind.java | 49 +++-
.../cassandra/distributed/impl/Instance.java | 2 +
.../distributed/test/PreviewRepairTest.java | 281 +++++++++++++++++++++
10 files changed, 396 insertions(+), 51 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 19906d3..9cd6040 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-alpha4
+ * Include finalized pending sstables in preview repair (CASSANDRA-15553)
* Reverted to the original behavior of CLUSTERING ORDER on CREATE TABLE (CASSANDRA-15271)
* Correct inaccurate logging message (CASSANDRA-15549)
* Add documentation of dynamo (CASSANDRA-15486)
diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
index b2d70f7..78d4483 100644
--- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java
@@ -449,7 +449,7 @@ class PendingRepairManager
}
else
{
- logger.debug("Setting repairedAt to {} on {} for {}", repairedAt, transaction.originals(), sessionID);
+ logger.info("Moving {} from pending to repaired with repaired at = {} and session id = {}", transaction.originals(), repairedAt, sessionID);
cfs.getCompactionStrategyManager().mutateRepaired(transaction.originals(), repairedAt, ActiveRepairService.NO_PENDING_REPAIR, false);
}
completed = true;
diff --git a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
index d653f6c..4eea678 100644
--- a/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
+++ b/src/java/org/apache/cassandra/db/repair/CassandraValidationIterator.java
@@ -30,7 +30,6 @@ import java.util.function.LongPredicate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
@@ -39,10 +38,8 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
-import org.apache.cassandra.db.compaction.ActiveCompactions;
import org.apache.cassandra.db.compaction.ActiveCompactionsTracker;
import org.apache.cassandra.db.compaction.CompactionController;
-import org.apache.cassandra.db.compaction.CompactionInterruptedException;
import org.apache.cassandra.db.compaction.CompactionIterator;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.OperationType;
@@ -54,14 +51,10 @@ import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.ISSTableScanner;
import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.metrics.CompactionMetrics;
import org.apache.cassandra.repair.ValidationPartitionIterator;
-import org.apache.cassandra.repair.Validator;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.streaming.PreviewKind;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.concurrent.Refs;
@@ -115,21 +108,6 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
}
}
- private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind previewKind)
- {
- switch (previewKind)
- {
- case ALL:
- return (s) -> true;
- case REPAIRED:
- return (s) -> s.isRepaired();
- case UNREPAIRED:
- return (s) -> !s.isRepaired();
- default:
- throw new RuntimeException("Can't get preview predicate for preview kind " + previewKind);
- }
- }
-
@VisibleForTesting
static synchronized Refs<SSTableReader> getSSTablesToValidate(ColumnFamilyStore cfs, Collection<Range<Token>> ranges, UUID parentId, boolean isIncremental)
{
@@ -147,7 +125,7 @@ public class CassandraValidationIterator extends ValidationPartitionIterator
com.google.common.base.Predicate<SSTableReader> predicate;
if (prs.isPreview())
{
- predicate = getPreviewPredicate(prs.previewKind);
+ predicate = prs.previewKind.predicate();
}
else if (isIncremental)
diff --git a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
index b88a5d6..a84fd27 100644
--- a/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
+++ b/src/java/org/apache/cassandra/db/streaming/CassandraStreamManager.java
@@ -81,21 +81,6 @@ public class CassandraStreamManager implements TableStreamManager
return new CassandraStreamReceiver(cfs, session, totalStreams);
}
- private static Predicate<SSTableReader> getPreviewPredicate(PreviewKind kind)
- {
- switch (kind)
- {
- case ALL:
- return Predicates.alwaysTrue();
- case UNREPAIRED:
- return Predicates.not(SSTableReader::isRepaired);
- case REPAIRED:
- return SSTableReader::isRepaired;
- default:
- throw new IllegalArgumentException("Unsupported kind: " + kind);
- }
- }
-
@Override
public Collection<OutgoingStream> createOutgoingStreams(StreamSession session, RangesAtEndpoint replicas, UUID pendingRepair, PreviewKind previewKind)
{
@@ -111,7 +96,7 @@ public class CassandraStreamManager implements TableStreamManager
Predicate<SSTableReader> predicate;
if (previewKind.isPreview())
{
- predicate = getPreviewPredicate(previewKind);
+ predicate = previewKind.predicate();
}
else if (pendingRepair == ActiveRepairService.NO_PENDING_REPAIR)
{
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 3483e59..95a6e57 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -31,10 +31,16 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.consistent.ConsistentSession;
+import org.apache.cassandra.repair.consistent.LocalSession;
+import org.apache.cassandra.repair.consistent.LocalSessions;
+import org.apache.cassandra.schema.TableId;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
import org.apache.cassandra.tracing.Tracing;
@@ -78,7 +84,8 @@ import org.apache.cassandra.utils.Pair;
* all of them in parallel otherwise.
*/
public class RepairSession extends AbstractFuture<RepairSessionResult> implements IEndpointStateChangeSubscriber,
- IFailureDetectionEventListener
+ IFailureDetectionEventListener,
+ LocalSessions.Listener
{
private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
@@ -401,4 +408,38 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// If a node failed, we stop everything (though there could still be some activity in the background)
forceShutdown(exception);
}
+
+ public void onIRStateChange(LocalSession session)
+ {
+ // we should only be registered as listeners for PreviewKind.REPAIRED, but double check here
+ if (previewKind == PreviewKind.REPAIRED &&
+ session.getState() == ConsistentSession.State.FINALIZED &&
+ includesTables(session.tableIds))
+ {
+ for (Range<Token> range : session.ranges)
+ {
+ if (range.intersects(ranges()))
+ {
+ logger.error("{} An intersecting incremental repair with session id = {} finished, preview repair might not be accurate", previewKind.logPrefix(getId()), session.sessionID);
+ forceShutdown(new Exception("An incremental repair with session id "+session.sessionID+" finished during this preview repair runtime"));
+ return;
+ }
+ }
+ }
+ }
+
+ private boolean includesTables(Set<TableId> tableIds)
+ {
+ Keyspace ks = Keyspace.open(keyspace);
+ if (ks != null)
+ {
+ for (String table : cfnames)
+ {
+ ColumnFamilyStore cfs = ks.getColumnFamilyStore(table);
+ if (tableIds.contains(cfs.metadata.id))
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 6475794..fa224d1 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -106,6 +107,7 @@ import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
public class LocalSessions
{
private static final Logger logger = LoggerFactory.getLogger(LocalSessions.class);
+ private static final Set<Listener> listeners = new CopyOnWriteArraySet<>();
/**
* Amount of time a session can go without any activity before we start checking the status of other
@@ -441,7 +443,7 @@ public class LocalSessions
return new LocalSession(builder);
}
- protected LocalSession getSession(UUID sessionID)
+ public LocalSession getSession(UUID sessionID)
{
return sessions.get(sessionID);
}
@@ -520,6 +522,8 @@ public class LocalSessions
{
sessionCompleted(session);
}
+ for (Listener listener : listeners)
+ listener.onIRStateChange(session);
}
}
@@ -777,7 +781,7 @@ public class LocalSessions
LocalSession session = getSession(sessionID);
if (session == null)
{
- logger.warn("Received status response message for unknown session {}", sessionID);
+ logger.warn("Received status request message for unknown session {}", sessionID);
sendMessage(from, Message.out(STATUS_RSP, new StatusResponse(sessionID, FAILED)));
}
else
@@ -868,4 +872,19 @@ public class LocalSessions
throw new IllegalStateException("Cannot get final repaired at value for in progress session: " + session);
}
}
+
+ public static void registerListener(Listener listener)
+ {
+ listeners.add(listener);
+ }
+
+ public static void unregisterListener(Listener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ public interface Listener
+ {
+ void onIRStateChange(LocalSession session);
+ }
}
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 6f4c474..7499c36 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -70,7 +70,6 @@ import org.apache.cassandra.repair.consistent.CoordinatorSessions;
import org.apache.cassandra.repair.consistent.LocalSessions;
import org.apache.cassandra.repair.messages.*;
import org.apache.cassandra.schema.TableId;
-import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.Pair;
@@ -230,6 +229,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
// register listeners
registerOnFdAndGossip(session);
+ if (session.previewKind == PreviewKind.REPAIRED)
+ LocalSessions.registerListener(session);
+
// remove session at completion
session.addListener(new Runnable()
{
@@ -239,6 +241,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
public void run()
{
sessions.remove(session.getId());
+ LocalSessions.unregisterListener(session);
}
}, MoreExecutors.directExecutor());
session.start(executor);
diff --git a/src/java/org/apache/cassandra/streaming/PreviewKind.java b/src/java/org/apache/cassandra/streaming/PreviewKind.java
index 51c5746..b5467de 100644
--- a/src/java/org/apache/cassandra/streaming/PreviewKind.java
+++ b/src/java/org/apache/cassandra/streaming/PreviewKind.java
@@ -18,22 +18,34 @@
package org.apache.cassandra.streaming;
-
import java.util.UUID;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.metadata.StatsMetadata;
+import org.apache.cassandra.repair.consistent.ConsistentSession;
+import org.apache.cassandra.repair.consistent.LocalSession;
+import org.apache.cassandra.service.ActiveRepairService;
+
public enum PreviewKind
{
- NONE(0),
- ALL(1),
- UNREPAIRED(2),
- REPAIRED(3);
+ NONE(0, (sstable) -> {
+ throw new RuntimeException("Can't get preview predicate for preview kind NONE");
+ }),
+ ALL(1, Predicates.alwaysTrue()),
+ UNREPAIRED(2, sstable -> !sstable.isRepaired()),
+ REPAIRED(3, new PreviewRepairedSSTablePredicate());
private final int serializationVal;
+ private final Predicate<SSTableReader> predicate;
- PreviewKind(int serializationVal)
+ PreviewKind(int serializationVal, Predicate<SSTableReader> predicate)
{
assert ordinal() == serializationVal;
this.serializationVal = serializationVal;
+ this.predicate = predicate;
}
public int getSerializationVal()
@@ -46,7 +58,6 @@ public enum PreviewKind
return values()[serializationVal];
}
-
public boolean isPreview()
{
return this != NONE;
@@ -62,4 +73,28 @@ public enum PreviewKind
return '[' + logPrefix() + " #" + sessionId.toString() + ']';
}
+ public Predicate<SSTableReader> predicate()
+ {
+ return predicate;
+ }
+
+ private static class PreviewRepairedSSTablePredicate implements Predicate<SSTableReader>
+ {
+ public boolean apply(SSTableReader sstable)
+ {
+ // grab the metadata before checking pendingRepair since this can be nulled out at any time
+ StatsMetadata sstableMetadata = sstable.getSSTableMetadata();
+ if (sstableMetadata.pendingRepair != null)
+ {
+ LocalSession session = ActiveRepairService.instance.consistent.local.getSession(sstableMetadata.pendingRepair);
+ if (session == null)
+ return false;
+ else if (session.getState() == ConsistentSession.State.FINALIZED)
+ return true;
+ else if (session.getState() != ConsistentSession.State.FAILED)
+ throw new IllegalStateException(String.format("SSTable %s is marked pending for non-finalized incremental repair session %s, failing preview repair", sstable, sstableMetadata.pendingRepair));
+ }
+ return sstableMetadata.repairedAt != ActiveRepairService.UNREPAIRED_SSTABLE;
+ }
+ }
}
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
index b8ef25d..1beb708 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/Instance.java
@@ -401,6 +401,8 @@ public class Instance extends IsolatedExecutor implements IInvokableInstance
if (!FBUtilities.getBroadcastAddressAndPort().equals(broadcastAddressAndPort()))
throw new IllegalStateException();
+
+ ActiveRepairService.instance.start();
}
catch (Throwable t)
{
diff --git a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
new file mode 100644
index 0000000..ed29a30
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.google.common.collect.ImmutableList;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.distributed.api.IIsolatedExecutor;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.repair.RepairParallelism;
+import org.apache.cassandra.repair.messages.RepairOption;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+import org.apache.cassandra.utils.progress.ProgressEventType;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class PreviewRepairTest extends DistributedTestBase
+{
+ /**
+ * makes sure that the repaired sstables are not matching on the two
+ * nodes by disabling autocompaction on node2 and then running an
+ * incremental repair
+ */
+ @Test
+ public void testWithMismatchingPending() throws Throwable
+ {
+ try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ cluster.get(1).callOnInstance(repair(options(false)));
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ // make sure that all sstables have moved to repaired by triggering a compaction
+ // also disables autocompaction on the nodes
+ cluster.forEach((node) -> node.runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+ cfs.disableAutoCompaction();
+ }));
+ cluster.get(1).callOnInstance(repair(options(false)));
+ // now re-enable autocompaction on node1, this moves the sstables for the new repair to repaired
+ cluster.get(1).runOnInstance(() -> {
+ ColumnFamilyStore cfs = Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ cfs.enableAutoCompaction();
+ FBUtilities.waitOnFutures(CompactionManager.instance.submitBackground(cfs));
+ });
+ Pair<Boolean, Boolean> rs = cluster.get(1).callOnInstance(repair(options(true)));
+ assertTrue(rs.left); // preview repair should succeed
+ assertFalse(rs.right); // and we should see no mismatches
+ }
+ }
+
+ /**
+ * another case where the repaired datasets could mismatch is if an incremental repair finishes just as the preview
+ * repair is starting up.
+ *
+ * This tests this case:
+ * 1. we start a preview repair
+ * 2. pause the validation requests from node1 -> node2
+ * 3. node1 starts its validation
+ * 4. run an incremental repair which completes fine
+ * 5. node2 resumes its validation
+ *
+ * Now we will include sstables from the second incremental repair on node2 but not on node1
+ * This should fail since we fail any preview repair which is ongoing when an incremental repair finishes (step 4 above)
+ */
+ @Test
+ public void testFinishingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ cluster.get(1).callOnInstance(repair(options(false)));
+
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ SimpleCondition continuePreviewRepair = new SimpleCondition();
+ DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+ // this pauses the validation request sent from node1 to node2 until we have run a full inc repair below
+ cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+ Future<Pair<Boolean, Boolean>> rsFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true))));
+ Thread.sleep(1000);
+ // this needs to finish before the preview repair is unpaused on node2
+ cluster.get(1).callOnInstance(repair(options(false)));
+ continuePreviewRepair.signalAll();
+ Pair<Boolean, Boolean> rs = rsFuture.get();
+ assertFalse(rs.left); // preview repair should have failed
+ assertFalse(rs.right); // and no mismatches should have been reported
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
+ /**
+ * Same as testFinishingIncRepairDuringPreview but the previewed range does not intersect the incremental repair
+ * so both preview and incremental repair should finish fine (without any mismatches)
+ */
+ @Test
+ public void testFinishingNonIntersectingIncRepairDuringPreview() throws IOException, InterruptedException, ExecutionException
+ {
+ ExecutorService es = Executors.newSingleThreadExecutor();
+ try(Cluster cluster = init(Cluster.build(2).withConfig(config -> config.with(GOSSIP).with(NETWORK)).start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int primary key, t int)");
+
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ assertTrue(cluster.get(1).callOnInstance(repair(options(false))).left);
+
+ insert(cluster.coordinator(1), 100, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+
+ // pause preview repair validation messages on node2 until node1 has finished
+ SimpleCondition continuePreviewRepair = new SimpleCondition();
+ DelayMessageFilter filter = new DelayMessageFilter(continuePreviewRepair);
+ cluster.filters().verbs(Verb.VALIDATION_REQ.id).from(1).to(2).messagesMatching(filter).drop();
+
+ // get local ranges to repair two separate ranges:
+ List<String> localRanges = cluster.get(1).callOnInstance(() -> {
+ List<String> res = new ArrayList<>();
+ for (Range<Token> r : StorageService.instance.getLocalReplicas(KEYSPACE).ranges())
+ res.add(r.left.getTokenValue()+ ":"+ r.right.getTokenValue());
+ return res;
+ });
+
+ assertEquals(2, localRanges.size());
+ Future<Pair<Boolean, Boolean>> repairStatusFuture = es.submit(() -> cluster.get(1).callOnInstance(repair(options(true, localRanges.get(0)))));
+ Thread.sleep(1000); // wait for node1 to start validation compaction
+ // this needs to finish before the preview repair is unpaused on node2
+ assertTrue(cluster.get(1).callOnInstance(repair(options(false, localRanges.get(1)))).left);
+
+ continuePreviewRepair.signalAll();
+ Pair<Boolean, Boolean> rs = repairStatusFuture.get();
+ assertTrue(rs.left); // repair should succeed
+ assertFalse(rs.right); // and no mismatches
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
+ private static class DelayMessageFilter implements IMessageFilters.Matcher
+ {
+ private final SimpleCondition condition;
+ private final AtomicBoolean waitForRepair = new AtomicBoolean(true);
+
+ public DelayMessageFilter(SimpleCondition condition)
+ {
+ this.condition = condition;
+ }
+ public boolean matches(int from, int to, IMessage message)
+ {
+ try
+ {
+ // only the first validation req should be delayed:
+ if (waitForRepair.compareAndSet(true, false))
+ condition.await();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ return false; // don't drop the message
+ }
+ }
+
+ private static void insert(ICoordinator coordinator, int start, int count)
+ {
+ for (int i = start; i < start + count; i++)
+ coordinator.execute("insert into " + KEYSPACE + ".tbl (id, t) values (?, ?)", ConsistencyLevel.ALL, i, i);
+ }
+
+ /**
+ * returns a pair with [repair success, was inconsistent]
+ */
+ private static IIsolatedExecutor.SerializableCallable<Pair<Boolean, Boolean>> repair(Map<String, String> options)
+ {
+ return () -> {
+ SimpleCondition await = new SimpleCondition();
+ AtomicBoolean success = new AtomicBoolean(true);
+ AtomicBoolean wasInconsistent = new AtomicBoolean(false);
+ StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
+ if (event.getType() == ProgressEventType.ERROR)
+ {
+ success.set(false);
+ await.signalAll();
+ }
+ else if (event.getType() == ProgressEventType.NOTIFICATION && event.getMessage().contains("Repaired data is inconsistent"))
+ {
+ wasInconsistent.set(true);
+ }
+ else if (event.getType() == ProgressEventType.COMPLETE)
+ await.signalAll();
+ }));
+ try
+ {
+ await.await(1, TimeUnit.MINUTES);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return Pair.create(success.get(), wasInconsistent.get());
+ };
+ }
+
+ private static Map<String, String> options(boolean preview)
+ {
+ Map<String, String> config = new HashMap<>();
+ config.put(RepairOption.INCREMENTAL_KEY, "true");
+ config.put(RepairOption.PARALLELISM_KEY, RepairParallelism.PARALLEL.toString());
+ if (preview)
+ config.put(RepairOption.PREVIEW, PreviewKind.REPAIRED.toString());
+ return config;
+ }
+
+ private static Map<String, String> options(boolean preview, String range)
+ {
+ Map<String, String> options = options(preview);
+ options.put(RepairOption.RANGES_KEY, range);
+ return options;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org