You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/10/09 20:51:39 UTC
[cassandra] branch trunk updated: Consolidate node liveness check
for forced repair
This is an automated email from the ASF dual-hosted git repository.
bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1728da3 Consolidate node liveness check for forced repair
1728da3 is described below
commit 1728da30e4e7858d30178ef74350af3e690adf0c
Author: yifan-c <yc...@gmail.com>
AuthorDate: Tue Sep 8 18:23:30 2020 -0700
Consolidate node liveness check for forced repair
Patch by Yifan Cai; Reviewed by Blake Eggleston for CASSANDRA-16113
---
CHANGES.txt | 1 +
.../org/apache/cassandra/repair/CommonRange.java | 21 ++--
.../apache/cassandra/repair/RepairRunnable.java | 121 +++++++++++----------
.../org/apache/cassandra/repair/RepairSession.java | 38 +------
.../cassandra/service/ActiveRepairService.java | 3 +-
.../distributed/test/DistributedRepairUtils.java | 33 ++++--
.../cassandra/distributed/test/RepairTest.java | 92 +++++++++++-----
...nnableTest.java => NeighborsAndRangesTest.java} | 25 +++--
.../org/apache/cassandra/repair/RepairJobTest.java | 7 +-
.../apache/cassandra/repair/RepairSessionTest.java | 2 +-
10 files changed, 185 insertions(+), 158 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 289d4e8..412b336 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta3
+ * Consolidate node liveness check for forced repair (CASSANDRA-16113)
* Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147)
* Abort repairs when getting a truncation request (CASSANDRA-15854)
* Remove bad assert when getting active compactions for an sstable (CASSANDRA-15457)
diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java
index dab77c5..5eb8061 100644
--- a/src/java/org/apache/cassandra/repair/CommonRange.java
+++ b/src/java/org/apache/cassandra/repair/CommonRange.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Objects;
import java.util.Set;
import com.google.common.base.Preconditions;
@@ -38,9 +39,15 @@ public class CommonRange
public final ImmutableSet<InetAddressAndPort> endpoints;
public final ImmutableSet<InetAddressAndPort> transEndpoints;
public final Collection<Range<Token>> ranges;
+ public final boolean hasSkippedReplicas;
public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges)
{
+ this(endpoints, transEndpoints, ranges, false);
+ }
+
+ public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges, boolean hasSkippedReplicas)
+ {
Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty(), "Endpoints can not be empty");
Preconditions.checkArgument(transEndpoints != null, "Transient endpoints can not be null");
Preconditions.checkArgument(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
@@ -49,6 +56,7 @@ public class CommonRange
this.endpoints = ImmutableSet.copyOf(endpoints);
this.transEndpoints = ImmutableSet.copyOf(transEndpoints);
this.ranges = new ArrayList<>(ranges);
+ this.hasSkippedReplicas = hasSkippedReplicas;
}
public boolean matchesEndpoints(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints)
@@ -64,17 +72,15 @@ public class CommonRange
CommonRange that = (CommonRange) o;
- if (!endpoints.equals(that.endpoints)) return false;
- if (!transEndpoints.equals(that.transEndpoints)) return false;
- return ranges.equals(that.ranges);
+ return Objects.equals(endpoints, that.endpoints)
+ && Objects.equals(transEndpoints, that.transEndpoints)
+ && Objects.equals(ranges, that.ranges)
+ && hasSkippedReplicas == that.hasSkippedReplicas;
}
public int hashCode()
{
- int result = endpoints.hashCode();
- result = 31 * result + transEndpoints.hashCode();
- result = 31 * result + ranges.hashCode();
- return result;
+ return Objects.hash(endpoints, transEndpoints, ranges, hasSkippedReplicas);
}
public String toString()
@@ -83,6 +89,7 @@ public class CommonRange
"endpoints=" + endpoints +
", transEndpoints=" + transEndpoints +
", ranges=" + ranges +
+ ", hasSkippedReplicas=" + hasSkippedReplicas +
'}';
}
}
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index f6aa6d1..5d8e945 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -32,7 +33,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -260,7 +260,7 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
maybeStoreParentRepairStart(cfnames);
- prepare(columnFamilies, neighborsAndRanges.allNeighbors, neighborsAndRanges.force);
+ prepare(columnFamilies, neighborsAndRanges.participants, neighborsAndRanges.shouldExcludeDeadParticipants);
repair(cfnames, neighborsAndRanges);
}
@@ -345,15 +345,15 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
progressCounter.incrementAndGet();
- boolean force = options.isForcedRepair();
+ boolean shouldExcludeDeadParticipants = options.isForcedRepair();
- if (force && options.isIncremental())
+ if (shouldExcludeDeadParticipants)
{
Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
- force = !allNeighbors.equals(actualNeighbors);
+ shouldExcludeDeadParticipants = !allNeighbors.equals(actualNeighbors);
allNeighbors = actualNeighbors;
}
- return new NeighborsAndRanges(force, allNeighbors, commonRanges);
+ return new NeighborsAndRanges(shouldExcludeDeadParticipants, allNeighbors, commonRanges);
}
private void maybeStoreParentRepairStart(String[] cfnames)
@@ -393,16 +393,15 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
{
if (options.isPreview())
{
- previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.commonRanges, cfnames);
+ previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
}
else if (options.isIncremental())
{
- incrementalRepair(parentSession, creationTimeMillis, neighborsAndRanges.force, traceState,
- neighborsAndRanges.allNeighbors, neighborsAndRanges.commonRanges, cfnames);
+ incrementalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges, cfnames);
}
else
{
- normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.commonRanges, cfnames);
+ normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
}
}
@@ -428,10 +427,10 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
@SuppressWarnings("unchecked")
public ListenableFuture apply(List<RepairSessionResult> results)
{
+ logger.debug("Repair result: {}", results);
// filter out null(=failed) results and get successful ranges
for (RepairSessionResult sessionResult : results)
{
- logger.debug("Repair result: {}", results);
if (sessionResult != null)
{
// don't record successful repair if we had to skip ranges
@@ -451,54 +450,21 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
}
- /**
- * removes dead nodes from common ranges, and exludes ranges left without any participants
- */
- @VisibleForTesting
- static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force)
- {
- if (!force)
- {
- return commonRanges;
- }
- else
- {
- List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
-
- for (CommonRange commonRange : commonRanges)
- {
- Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
- Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
- Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
-
- // this node is implicitly a participant in this repair, so a single endpoint is ok here
- if (!endpoints.isEmpty())
- {
- filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges));
- }
- }
- Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
- return filtered;
- }
- }
-
private void incrementalRepair(UUID parentSession,
long startTime,
- boolean forceRepair,
TraceState traceState,
- Set<InetAddressAndPort> allNeighbors,
- List<CommonRange> commonRanges,
+ NeighborsAndRanges neighborsAndRanges,
String... cfnames)
{
// the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
- .addAll(allNeighbors)
+ .addAll(neighborsAndRanges.participants)
.add(FBUtilities.getBroadcastAddressAndPort())
.build();
+ // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints.
+ List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
- List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
-
- CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair);
+ CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
ListeningExecutorService executor = createExecutor();
AtomicBoolean hasFailure = new AtomicBoolean(false);
ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
@@ -640,9 +606,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
{
List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
- // we do endpoint filtering at the start of an incremental repair,
- // so repair sessions shouldn't also be checking liveness
- boolean force = options.isForcedRepair() && !isIncremental;
for (CommonRange commonRange : commonRanges)
{
logger.info("Starting RepairSession for {}", commonRange);
@@ -652,7 +615,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
options.getParallelism(),
isIncremental,
options.isPullRepair(),
- force,
options.getPreviewKind(),
options.optimiseStreams(),
executor,
@@ -851,17 +813,58 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
}
}
- private static final class NeighborsAndRanges
+ static final class NeighborsAndRanges
{
- private final boolean force;
- private final Set<InetAddressAndPort> allNeighbors;
+ private final boolean shouldExcludeDeadParticipants;
+ private final Set<InetAddressAndPort> participants;
private final List<CommonRange> commonRanges;
- private NeighborsAndRanges(boolean force, Set<InetAddressAndPort> allNeighbors, List<CommonRange> commonRanges)
+ NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set<InetAddressAndPort> participants, List<CommonRange> commonRanges)
{
- this.force = force;
- this.allNeighbors = allNeighbors;
+ this.shouldExcludeDeadParticipants = shouldExcludeDeadParticipants;
+ this.participants = participants;
this.commonRanges = commonRanges;
}
+
+ /**
+ * When in the force mode, removes dead nodes from common ranges (not contained within `allNeighbors`),
+ * and exludes ranges left without any participants
+ * When not in the force mode, no-op.
+ */
+ List<CommonRange> filterCommonRanges(String keyspace, String[] tableNames)
+ {
+ if (!shouldExcludeDeadParticipants)
+ {
+ return commonRanges;
+ }
+ else
+ {
+ logger.debug("force flag set, removing dead endpoints if possible");
+
+ List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
+
+ for (CommonRange commonRange : commonRanges)
+ {
+ Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, participants::contains));
+ Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, participants::contains));
+ Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
+
+ // this node is implicitly a participant in this repair, so a single endpoint is ok here
+ if (!endpoints.isEmpty())
+ {
+ Set<InetAddressAndPort> skippedReplicas = Sets.difference(commonRange.endpoints, endpoints);
+ skippedReplicas.forEach(endpoint -> logger.info("Removing a dead node {} from repair for ranges {} due to -force", endpoint, commonRange.ranges));
+ filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges, !skippedReplicas.isEmpty()));
+ }
+ else
+ {
+ logger.warn("Skipping forced repair for ranges {} of tables {} in keyspace {}, as no neighbor nodes are live.",
+ commonRange.ranges, Arrays.asList(tableNames), keyspace);
+ }
+ }
+ Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
+ return filtered;
+ }
+ }
}
}
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 2468857..e13c90c 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -97,9 +97,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final RepairParallelism parallelismDegree;
public final boolean pullRepair;
- // indicates some replicas were not included in the repair. Only relevant for --force option
- public final boolean skippedReplicas;
-
/** Range to repair */
public final CommonRange commonRange;
public final boolean isIncremental;
@@ -126,7 +123,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
* @param keyspace name of keyspace
* @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
* @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
- * @param force true if the repair should ignore dead endpoints (instead of failing)
* @param cfnames names of columnfamilies
*/
public RepairSession(UUID parentRepairSession,
@@ -136,7 +132,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
RepairParallelism parallelismDegree,
boolean isIncremental,
boolean pullRepair,
- boolean force,
PreviewKind previewKind,
boolean optimiseStreams,
String... cfnames)
@@ -148,37 +143,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.parallelismDegree = parallelismDegree;
this.keyspace = keyspace;
this.cfnames = cfnames;
-
- //If force then filter out dead endpoints
- boolean forceSkippedReplicas = false;
- if (force)
- {
- logger.debug("force flag set, removing dead endpoints");
- final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
- for (final InetAddressAndPort endpoint : commonRange.endpoints)
- {
- if (!FailureDetector.instance.isAlive(endpoint))
- {
- logger.info("Removing a dead node from Repair due to -force {}", endpoint);
- removeCandidates.add(endpoint);
- }
- }
- if (!removeCandidates.isEmpty())
- {
- // we shouldn't be recording a successful repair if
- // any replicas are excluded from the repair
- forceSkippedReplicas = true;
- Set<InetAddressAndPort> filteredEndpoints = new HashSet<>(commonRange.endpoints);
- filteredEndpoints.removeAll(removeCandidates);
- commonRange = new CommonRange(filteredEndpoints, commonRange.transEndpoints, commonRange.ranges);
- }
- }
-
this.commonRange = commonRange;
this.isIncremental = isIncremental;
this.previewKind = previewKind;
this.pullRepair = pullRepair;
- this.skippedReplicas = forceSkippedReplicas;
this.optimiseStreams = optimiseStreams;
this.taskExecutor = MoreExecutors.listeningDecorator(createExecutor());
}
@@ -297,7 +265,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
{
logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", commonRange));
Tracing.traceRepair(message);
- set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
+ set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), commonRange.hasSkippedReplicas));
if (!previewKind.isPreview())
{
SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
@@ -308,7 +276,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// Checking all nodes are live
for (InetAddressAndPort endpoint : commonRange.endpoints)
{
- if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
+ if (!FailureDetector.instance.isAlive(endpoint) && !commonRange.hasSkippedReplicas)
{
message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
logger.error("{} {}", previewKind.logPrefix(getId()), message);
@@ -339,7 +307,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// this repair session is completed
logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully");
Tracing.traceRepair("Completed sync of range {}", commonRange);
- set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, skippedReplicas));
+ set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, commonRange.hasSkippedReplicas));
taskExecutor.shutdown();
// mark this session as terminated
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f7b0686..2cdc794 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -315,7 +315,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairParallelism parallelismDegree,
boolean isIncremental,
boolean pullRepair,
- boolean force,
PreviewKind previewKind,
boolean optimiseStreams,
ListeningExecutorService executor,
@@ -328,7 +327,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return null;
final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace,
- parallelismDegree, isIncremental, pullRepair, force,
+ parallelismDegree, isIncremental, pullRepair,
previewKind, optimiseStreams, cfnames);
sessions.put(session.getId(), session);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index 023d02c..fce67b5 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.ArrayUtils;
import org.junit.Assert;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.api.QueryResult;
@@ -45,11 +46,11 @@ public final class DistributedRepairUtils
}
- public static NodeToolResult repair(AbstractCluster<?> cluster, RepairType repairType, boolean withNotifications, String... args) {
+ public static NodeToolResult repair(ICluster<IInvokableInstance> cluster, RepairType repairType, boolean withNotifications, String... args) {
return repair(cluster, DEFAULT_COORDINATOR, repairType, withNotifications, args);
}
- public static NodeToolResult repair(AbstractCluster<?> cluster, int node, RepairType repairType, boolean withNotifications, String... args) {
+ public static NodeToolResult repair(ICluster<IInvokableInstance> cluster, int node, RepairType repairType, boolean withNotifications, String... args) {
args = repairType.append(args);
args = ArrayUtils.addAll(new String[] { "repair" }, args);
return cluster.get(node).nodetoolResult(withNotifications, args);
@@ -65,12 +66,12 @@ public final class DistributedRepairUtils
return cluster.get(node).callOnInstance(() -> StorageMetrics.repairExceptions.getCount());
}
- public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, String ks, String table)
+ public static QueryResult queryParentRepairHistory(ICluster<IInvokableInstance> cluster, String ks, String table)
{
return queryParentRepairHistory(cluster, DEFAULT_COORDINATOR, ks, table);
}
- public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+ public static QueryResult queryParentRepairHistory(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table)
{
// This is kinda brittle since the caller never gets the ID and can't ask for the ID; it needs to infer the id
// this logic makes the assumption the ks/table pairs are unique (should be or else create should fail) so any
@@ -84,35 +85,41 @@ public final class DistributedRepairUtils
return rs;
}
- public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks, String table)
+ public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, String ks, String table)
{
assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks, table);
}
- public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+ public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table)
{
QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
}
- public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks)
+ public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, String ks)
{
assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks);
}
- public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks)
+ public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, int coordinator, String ks)
{
QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, null);
Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
}
- public static void assertParentRepairSuccess(AbstractCluster<?> cluster, String ks, String table)
+ public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, String ks, String table)
{
assertParentRepairSuccess(cluster, DEFAULT_COORDINATOR, ks, table);
}
- public static void assertParentRepairSuccess(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+ public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table)
{
+ assertParentRepairSuccess(cluster, coordinator, ks, table, row -> {});
+ }
+
+ public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table, Consumer<Row> moreSuccessCriteria)
+ {
+ Assert.assertNotNull("Invalid null value for moreSuccessCriteria", moreSuccessCriteria);
QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
validateExistingParentRepair(rs, row -> {
// check completed
@@ -121,15 +128,17 @@ public final class DistributedRepairUtils
// check not failed (aka success)
Assert.assertNull("Exception found", row.getString("exception_stacktrace"));
Assert.assertNull("Exception found", row.getString("exception_message"));
+
+ moreSuccessCriteria.accept(row);
});
}
- public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, String ks, String table, String message)
+ public static void assertParentRepairFailedWithMessageContains(ICluster<IInvokableInstance> cluster, String ks, String table, String message)
{
assertParentRepairFailedWithMessageContains(cluster, DEFAULT_COORDINATOR, ks, table, message);
}
- public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, int coordinator, String ks, String table, String message)
+ public static void assertParentRepairFailedWithMessageContains(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table, String message)
{
QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
validateExistingParentRepair(rs, row -> {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
index f37a3d8..b127a74 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
@@ -21,11 +21,14 @@ package org.apache.cassandra.distributed.test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -38,27 +41,26 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
import org.apache.cassandra.utils.progress.ProgressEventType;
import static java.util.concurrent.TimeUnit.MINUTES;
-import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
public class RepairTest extends TestBaseImpl
{
- private static final String insert = withKeyspace("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');");
- private static final String query = withKeyspace("SELECT k, c1, c2 FROM %s.test WHERE k = ?;");
-
private static ICluster<IInvokableInstance> cluster;
- private static void insert(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes)
+ private static void insert(ICluster<IInvokableInstance> cluster, String keyspace, int start, int end, int ... nodes)
{
+ String insert = String.format("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');", keyspace);
for (int i = start ; i < end ; ++i)
for (int node : nodes)
cluster.get(node).executeInternal(insert, Integer.toString(i));
}
- private static void verify(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes)
+ private static void verify(ICluster<IInvokableInstance> cluster, String keyspace, int start, int end, int ... nodes)
{
+ String query = String.format("SELECT k, c1, c2 FROM %s.test WHERE k = ?;", keyspace);
for (int i = start ; i < end ; ++i)
{
for (int node = 1 ; node <= cluster.size() ; ++node)
@@ -72,10 +74,10 @@ public class RepairTest extends TestBaseImpl
}
}
- private static void flush(ICluster<IInvokableInstance> cluster, int ... nodes)
+ private static void flush(ICluster<IInvokableInstance> cluster, String keyspace, int ... nodes)
{
for (int node : nodes)
- cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(KEYSPACE)));
+ cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(keyspace)));
}
private static ICluster create(Consumer<IInstanceConfig> configModifier) throws IOException
@@ -90,11 +92,11 @@ public class RepairTest extends TestBaseImpl
return init(Cluster.build().withNodes(3).withConfig(configModifier).start());
}
- private void repair(ICluster<IInvokableInstance> cluster, Map<String, String> options)
+ static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options)
{
cluster.get(1).runOnInstance(rethrow(() -> {
SimpleCondition await = new SimpleCondition();
- StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
+ StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> {
if (event.getType() == ProgressEventType.COMPLETE)
await.signalAll();
})).right.get();
@@ -102,22 +104,22 @@ public class RepairTest extends TestBaseImpl
}));
}
- void populate(ICluster<IInvokableInstance> cluster, String compression) throws Exception
+ static void populate(ICluster<IInvokableInstance> cluster, String keyspace, String compression) throws Exception
{
try
{
- cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS %s.test;"));
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = " + compression));
-
- insert(cluster, 0, 1000, 1, 2, 3);
- flush(cluster, 1);
- insert(cluster, 1000, 1001, 1, 2);
- insert(cluster, 1001, 2001, 1, 2, 3);
- flush(cluster, 1, 2, 3);
-
- verify(cluster, 0, 1000, 1, 2, 3);
- verify(cluster, 1000, 1001, 1, 2);
- verify(cluster, 1001, 2001, 1, 2, 3);
+ cluster.schemaChange(String.format("DROP TABLE IF EXISTS %s.test;", keyspace));
+ cluster.schemaChange(String.format("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s", keyspace, compression));
+
+ insert(cluster, keyspace, 0, 1000, 1, 2, 3);
+ flush(cluster, keyspace, 1);
+ insert(cluster, keyspace, 1000, 1001, 1, 2);
+ insert(cluster, keyspace, 1001, 2001, 1, 2, 3);
+ flush(cluster, keyspace, 1, 2, 3);
+
+ verify(cluster, keyspace, 0, 1000, 1, 2, 3);
+ verify(cluster, keyspace, 1000, 1001, 1, 2);
+ verify(cluster, keyspace, 1001, 2001, 1, 2, 3);
}
catch (Throwable t)
{
@@ -129,9 +131,16 @@ public class RepairTest extends TestBaseImpl
void repair(ICluster<IInvokableInstance> cluster, boolean sequential, String compression) throws Exception
{
- populate(cluster, compression);
- repair(cluster, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel"));
- verify(cluster, 0, 2001, 1, 2, 3);
+ populate(cluster, KEYSPACE, compression);
+ repair(cluster, KEYSPACE, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel"));
+ verify(cluster, KEYSPACE, 0, 2001, 1, 2, 3);
+ }
+
+ void shutDownNodesAndForceRepair(ICluster<IInvokableInstance> cluster, String keyspace, int downNode) throws Exception
+ {
+ populate(cluster, keyspace, "{'enabled': false}");
+ cluster.get(downNode).shutdown().get(5, TimeUnit.SECONDS);
+ repair(cluster, keyspace, ImmutableMap.of("forceRepair", "true"));
}
@BeforeClass
@@ -182,4 +191,33 @@ public class RepairTest extends TestBaseImpl
{
repair(cluster, false, "{'enabled': false}");
}
+
+ @Test
+ public void testForcedNormalRepairWithOneNodeDown() throws Exception
+ {
+ // The test uses its own keyspace with rf == 2
+ String forceRepairKeyspace = "test_force_repair_keyspace";
+ int rf = 2;
+ cluster.schemaChange("CREATE KEYSPACE " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
+
+ try
+ {
+ shutDownNodesAndForceRepair(cluster, forceRepairKeyspace, 3); // shutdown node 3 after inserting
+ DistributedRepairUtils.assertParentRepairSuccess(cluster, 1, forceRepairKeyspace, "test", row -> {
+ Set<String> successfulRanges = row.getSet("successful_ranges");
+ Set<String> requestedRanges = row.getSet("requested_ranges");
+ Assert.assertNotNull("Found no successful ranges", successfulRanges);
+ Assert.assertNotNull("Found no requested ranges", requestedRanges);
+ Assert.assertEquals("Requested ranges count should equals to replication factor", rf, requestedRanges.size());
+ Assert.assertTrue("Given clusterSize = 3, RF = 2 and 1 node down in the replica set, it should yield only 1 successful repaired range.",
+ successfulRanges.size() == 1 && !successfulRanges.contains("")); // the successful ranges set should not only contain empty string
+ });
+ }
+ finally
+ {
+ // bring the node 3 back up
+ if (cluster.get(3).isShutdown())
+ cluster.get(3).startup(cluster);
+ }
+ }
}
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java
similarity index 67%
rename from test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
rename to test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java
index 418d7de..9dcd7dc 100644
--- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
+++ b/test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java
@@ -29,35 +29,38 @@ import org.junit.Test;
import org.apache.cassandra.locator.InetAddressAndPort;
-import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
+import static org.apache.cassandra.repair.RepairRunnable.NeighborsAndRanges;
-public class RepairRunnableTest extends AbstractRepairTest
+public class NeighborsAndRangesTest extends AbstractRepairTest
{
/**
* For non-forced repairs, common ranges should be passed through as-is
*/
@Test
- public void filterCommonIncrementalRangesNotForced() throws Exception
+ public void filterCommonIncrementalRangesNotForced()
{
CommonRange cr = new CommonRange(PARTICIPANTS, Collections.emptySet(), ALL_RANGES);
-
+ NeighborsAndRanges nr = new NeighborsAndRanges(false, PARTICIPANTS, Collections.singletonList(cr));
List<CommonRange> expected = Lists.newArrayList(cr);
- List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false);
+ List<CommonRange> actual = nr.filterCommonRanges(null, null);
Assert.assertEquals(expected, actual);
}
@Test
- public void forceFilterCommonIncrementalRanges() throws Exception
+ public void forceFilterCommonIncrementalRanges()
{
- CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2));
+ CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1));
CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3));
+ CommonRange cr3 = new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE2));
Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
+ List<CommonRange> initial = Lists.newArrayList(cr1, cr2, cr3);
+ List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1), true),
+ new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3), true),
+ new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE2), false));
- List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
- List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)),
- new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3)));
- List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true);
+ NeighborsAndRanges nr = new NeighborsAndRanges(true, liveEndpoints, initial);
+ List<CommonRange> actual = nr.filterCommonRanges(null, null);
Assert.assertEquals(expected, actual);
}
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index d3af58f..9887d38 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -107,9 +107,9 @@ public class RepairJobTest
public MeasureableRepairSession(UUID parentRepairSession, UUID id, CommonRange commonRange, String keyspace,
RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair,
- boolean force, PreviewKind previewKind, boolean optimiseStreams, String... cfnames)
+ PreviewKind previewKind, boolean optimiseStreams, String... cfnames)
{
- super(parentRepairSession, id, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames);
+ super(parentRepairSession, id, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, previewKind, optimiseStreams, cfnames);
}
protected DebuggableThreadPoolExecutor createExecutor()
@@ -168,8 +168,7 @@ public class RepairJobTest
this.session = new MeasureableRepairSession(parentRepairSession, UUIDGen.getTimeUUID(),
new CommonRange(neighbors, Collections.emptySet(), FULL_RANGE),
- KEYSPACE, RepairParallelism.SEQUENTIAL,
- false, false, false,
+ KEYSPACE, RepairParallelism.SEQUENTIAL, false, false,
PreviewKind.NONE, false, CF);
this.job = new RepairJob(session, CF);
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index e77d657..2ad5831 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -66,7 +66,7 @@ public class RepairSessionTest
RepairSession session = new RepairSession(parentSessionId, sessionId,
new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)),
"Keyspace1", RepairParallelism.SEQUENTIAL,
- false, false, false,
+ false, false,
PreviewKind.NONE, false, "Standard1");
// perform convict
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org