You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2020/10/09 20:51:39 UTC

[cassandra] branch trunk updated: Consolidate node liveness check for forced repair

This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1728da3   Consolidate node liveness check for forced repair
1728da3 is described below

commit 1728da30e4e7858d30178ef74350af3e690adf0c
Author: yifan-c <yc...@gmail.com>
AuthorDate: Tue Sep 8 18:23:30 2020 -0700

     Consolidate node liveness check for forced repair
    
     Patch by Yifan Cai; Reviewed by Blake Eggleston for CASSANDRA-16113
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/repair/CommonRange.java   |  21 ++--
 .../apache/cassandra/repair/RepairRunnable.java    | 121 +++++++++++----------
 .../org/apache/cassandra/repair/RepairSession.java |  38 +------
 .../cassandra/service/ActiveRepairService.java     |   3 +-
 .../distributed/test/DistributedRepairUtils.java   |  33 ++++--
 .../cassandra/distributed/test/RepairTest.java     |  92 +++++++++++-----
 ...nnableTest.java => NeighborsAndRangesTest.java} |  25 +++--
 .../org/apache/cassandra/repair/RepairJobTest.java |   7 +-
 .../apache/cassandra/repair/RepairSessionTest.java |   2 +-
 10 files changed, 185 insertions(+), 158 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 289d4e8..412b336 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta3
+ * Consolidate node liveness check for forced repair (CASSANDRA-16113)
  * Use unsigned short in ValueAccessor.sliceWithShortLength (CASSANDRA-16147)
  * Abort repairs when getting a truncation request (CASSANDRA-15854)
  * Remove bad assert when getting active compactions for an sstable (CASSANDRA-15457)
diff --git a/src/java/org/apache/cassandra/repair/CommonRange.java b/src/java/org/apache/cassandra/repair/CommonRange.java
index dab77c5..5eb8061 100644
--- a/src/java/org/apache/cassandra/repair/CommonRange.java
+++ b/src/java/org/apache/cassandra/repair/CommonRange.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.repair;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Objects;
 import java.util.Set;
 
 import com.google.common.base.Preconditions;
@@ -38,9 +39,15 @@ public class CommonRange
     public final ImmutableSet<InetAddressAndPort> endpoints;
     public final ImmutableSet<InetAddressAndPort> transEndpoints;
     public final Collection<Range<Token>> ranges;
+    public final boolean hasSkippedReplicas;
 
     public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges)
     {
+        this(endpoints, transEndpoints, ranges, false);
+    }
+
+    public CommonRange(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints, Collection<Range<Token>> ranges, boolean hasSkippedReplicas)
+    {
         Preconditions.checkArgument(endpoints != null && !endpoints.isEmpty(), "Endpoints can not be empty");
         Preconditions.checkArgument(transEndpoints != null, "Transient endpoints can not be null");
         Preconditions.checkArgument(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
@@ -49,6 +56,7 @@ public class CommonRange
         this.endpoints = ImmutableSet.copyOf(endpoints);
         this.transEndpoints = ImmutableSet.copyOf(transEndpoints);
         this.ranges = new ArrayList<>(ranges);
+        this.hasSkippedReplicas = hasSkippedReplicas;
     }
 
     public boolean matchesEndpoints(Set<InetAddressAndPort> endpoints, Set<InetAddressAndPort> transEndpoints)
@@ -64,17 +72,15 @@ public class CommonRange
 
         CommonRange that = (CommonRange) o;
 
-        if (!endpoints.equals(that.endpoints)) return false;
-        if (!transEndpoints.equals(that.transEndpoints)) return false;
-        return ranges.equals(that.ranges);
+        return Objects.equals(endpoints, that.endpoints)
+               && Objects.equals(transEndpoints, that.transEndpoints)
+               && Objects.equals(ranges, that.ranges)
+               && hasSkippedReplicas == that.hasSkippedReplicas;
     }
 
     public int hashCode()
     {
-        int result = endpoints.hashCode();
-        result = 31 * result + transEndpoints.hashCode();
-        result = 31 * result + ranges.hashCode();
-        return result;
+        return Objects.hash(endpoints, transEndpoints, ranges, hasSkippedReplicas);
     }
 
     public String toString()
@@ -83,6 +89,7 @@ public class CommonRange
                "endpoints=" + endpoints +
                ", transEndpoints=" + transEndpoints +
                ", ranges=" + ranges +
+               ", hasSkippedReplicas=" + hasSkippedReplicas +
                '}';
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index f6aa6d1..5d8e945 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.repair;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -32,7 +33,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -260,7 +260,7 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
 
         maybeStoreParentRepairStart(cfnames);
 
-        prepare(columnFamilies, neighborsAndRanges.allNeighbors, neighborsAndRanges.force);
+        prepare(columnFamilies, neighborsAndRanges.participants, neighborsAndRanges.shouldExcludeDeadParticipants);
 
         repair(cfnames, neighborsAndRanges);
     }
@@ -345,15 +345,15 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
 
         progressCounter.incrementAndGet();
 
-        boolean force = options.isForcedRepair();
+        boolean shouldExcludeDeadParticipants = options.isForcedRepair();
 
-        if (force && options.isIncremental())
+        if (shouldExcludeDeadParticipants)
         {
             Set<InetAddressAndPort> actualNeighbors = Sets.newHashSet(Iterables.filter(allNeighbors, FailureDetector.instance::isAlive));
-            force = !allNeighbors.equals(actualNeighbors);
+            shouldExcludeDeadParticipants = !allNeighbors.equals(actualNeighbors);
             allNeighbors = actualNeighbors;
         }
-        return new NeighborsAndRanges(force, allNeighbors, commonRanges);
+        return new NeighborsAndRanges(shouldExcludeDeadParticipants, allNeighbors, commonRanges);
     }
 
     private void maybeStoreParentRepairStart(String[] cfnames)
@@ -393,16 +393,15 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
     {
         if (options.isPreview())
         {
-            previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.commonRanges, cfnames);
+            previewRepair(parentSession, creationTimeMillis, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
         }
         else if (options.isIncremental())
         {
-            incrementalRepair(parentSession, creationTimeMillis, neighborsAndRanges.force, traceState,
-                              neighborsAndRanges.allNeighbors, neighborsAndRanges.commonRanges, cfnames);
+            incrementalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges, cfnames);
         }
         else
         {
-            normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.commonRanges, cfnames);
+            normalRepair(parentSession, creationTimeMillis, traceState, neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
         }
     }
 
@@ -428,10 +427,10 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
             @SuppressWarnings("unchecked")
             public ListenableFuture apply(List<RepairSessionResult> results)
             {
+                logger.debug("Repair result: {}", results);
                 // filter out null(=failed) results and get successful ranges
                 for (RepairSessionResult sessionResult : results)
                 {
-                    logger.debug("Repair result: {}", results);
                     if (sessionResult != null)
                     {
                         // don't record successful repair if we had to skip ranges
@@ -451,54 +450,21 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
         Futures.addCallback(repairResult, new RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, hasFailure, executor), MoreExecutors.directExecutor());
     }
 
-    /**
-     * removes dead nodes from common ranges, and exludes ranges left without any participants
-     */
-    @VisibleForTesting
-    static List<CommonRange> filterCommonRanges(List<CommonRange> commonRanges, Set<InetAddressAndPort> liveEndpoints, boolean force)
-    {
-        if (!force)
-        {
-            return commonRanges;
-        }
-        else
-        {
-            List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
-
-            for (CommonRange commonRange : commonRanges)
-            {
-                Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, liveEndpoints::contains));
-                Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, liveEndpoints::contains));
-                Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
-
-                // this node is implicitly a participant in this repair, so a single endpoint is ok here
-                if (!endpoints.isEmpty())
-                {
-                    filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges));
-                }
-            }
-            Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
-            return filtered;
-        }
-    }
-
     private void incrementalRepair(UUID parentSession,
                                    long startTime,
-                                   boolean forceRepair,
                                    TraceState traceState,
-                                   Set<InetAddressAndPort> allNeighbors,
-                                   List<CommonRange> commonRanges,
+                                   NeighborsAndRanges neighborsAndRanges,
                                    String... cfnames)
     {
         // the local node also needs to be included in the set of participants, since coordinator sessions aren't persisted
         Set<InetAddressAndPort> allParticipants = ImmutableSet.<InetAddressAndPort>builder()
-                                                  .addAll(allNeighbors)
+                                                  .addAll(neighborsAndRanges.participants)
                                                   .add(FBUtilities.getBroadcastAddressAndPort())
                                                   .build();
+        // Not necessary to include self for filtering. The common ranges only contains neighbhor node endpoints.
+        List<CommonRange> allRanges = neighborsAndRanges.filterCommonRanges(keyspace, cfnames);
 
-        List<CommonRange> allRanges = filterCommonRanges(commonRanges, allParticipants, forceRepair);
-
-        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, forceRepair);
+        CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants, neighborsAndRanges.shouldExcludeDeadParticipants);
         ListeningExecutorService executor = createExecutor();
         AtomicBoolean hasFailure = new AtomicBoolean(false);
         ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, allRanges, cfnames),
@@ -640,9 +606,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
     {
         List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size());
 
-        // we do endpoint filtering at the start of an incremental repair,
-        // so repair sessions shouldn't also be checking liveness
-        boolean force = options.isForcedRepair() && !isIncremental;
         for (CommonRange commonRange : commonRanges)
         {
             logger.info("Starting RepairSession for {}", commonRange);
@@ -652,7 +615,6 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
                                                                                      options.getParallelism(),
                                                                                      isIncremental,
                                                                                      options.isPullRepair(),
-                                                                                     force,
                                                                                      options.getPreviewKind(),
                                                                                      options.optimiseStreams(),
                                                                                      executor,
@@ -851,17 +813,58 @@ public class RepairRunnable implements Runnable, ProgressEventNotifier
         }
     }
 
-    private static final class NeighborsAndRanges
+    static final class NeighborsAndRanges
     {
-        private final boolean force;
-        private final Set<InetAddressAndPort> allNeighbors;
+        private final boolean shouldExcludeDeadParticipants;
+        private final Set<InetAddressAndPort> participants;
         private final List<CommonRange> commonRanges;
 
-        private NeighborsAndRanges(boolean force, Set<InetAddressAndPort> allNeighbors, List<CommonRange> commonRanges)
+        NeighborsAndRanges(boolean shouldExcludeDeadParticipants, Set<InetAddressAndPort> participants, List<CommonRange> commonRanges)
         {
-            this.force = force;
-            this.allNeighbors = allNeighbors;
+            this.shouldExcludeDeadParticipants = shouldExcludeDeadParticipants;
+            this.participants = participants;
             this.commonRanges = commonRanges;
         }
+
+        /**
+         * When in the force mode, removes dead nodes from common ranges (not contained within `allNeighbors`),
+         * and exludes ranges left without any participants
+         * When not in the force mode, no-op.
+         */
+        List<CommonRange> filterCommonRanges(String keyspace, String[] tableNames)
+        {
+            if (!shouldExcludeDeadParticipants)
+            {
+                return commonRanges;
+            }
+            else
+            {
+                logger.debug("force flag set, removing dead endpoints if possible");
+
+                List<CommonRange> filtered = new ArrayList<>(commonRanges.size());
+
+                for (CommonRange commonRange : commonRanges)
+                {
+                    Set<InetAddressAndPort> endpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.endpoints, participants::contains));
+                    Set<InetAddressAndPort> transEndpoints = ImmutableSet.copyOf(Iterables.filter(commonRange.transEndpoints, participants::contains));
+                    Preconditions.checkState(endpoints.containsAll(transEndpoints), "transEndpoints must be a subset of endpoints");
+
+                    // this node is implicitly a participant in this repair, so a single endpoint is ok here
+                    if (!endpoints.isEmpty())
+                    {
+                        Set<InetAddressAndPort> skippedReplicas = Sets.difference(commonRange.endpoints, endpoints);
+                        skippedReplicas.forEach(endpoint -> logger.info("Removing a dead node {} from repair for ranges {} due to -force", endpoint, commonRange.ranges));
+                        filtered.add(new CommonRange(endpoints, transEndpoints, commonRange.ranges, !skippedReplicas.isEmpty()));
+                    }
+                    else
+                    {
+                        logger.warn("Skipping forced repair for ranges {} of tables {} in keyspace {}, as no neighbor nodes are live.",
+                                    commonRange.ranges, Arrays.asList(tableNames), keyspace);
+                    }
+                }
+                Preconditions.checkState(!filtered.isEmpty(), "Not enough live endpoints for a repair");
+                return filtered;
+            }
+        }
     }
 }
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index 2468857..e13c90c 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -97,9 +97,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final RepairParallelism parallelismDegree;
     public final boolean pullRepair;
 
-    // indicates some replicas were not included in the repair. Only relevant for --force option
-    public final boolean skippedReplicas;
-
     /** Range to repair */
     public final CommonRange commonRange;
     public final boolean isIncremental;
@@ -126,7 +123,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      * @param keyspace name of keyspace
      * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
      * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
-     * @param force true if the repair should ignore dead endpoints (instead of failing)
      * @param cfnames names of columnfamilies
      */
     public RepairSession(UUID parentRepairSession,
@@ -136,7 +132,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                          RepairParallelism parallelismDegree,
                          boolean isIncremental,
                          boolean pullRepair,
-                         boolean force,
                          PreviewKind previewKind,
                          boolean optimiseStreams,
                          String... cfnames)
@@ -148,37 +143,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.parallelismDegree = parallelismDegree;
         this.keyspace = keyspace;
         this.cfnames = cfnames;
-
-        //If force then filter out dead endpoints
-        boolean forceSkippedReplicas = false;
-        if (force)
-        {
-            logger.debug("force flag set, removing dead endpoints");
-            final Set<InetAddressAndPort> removeCandidates = new HashSet<>();
-            for (final InetAddressAndPort endpoint : commonRange.endpoints)
-            {
-                if (!FailureDetector.instance.isAlive(endpoint))
-                {
-                    logger.info("Removing a dead node from Repair due to -force {}", endpoint);
-                    removeCandidates.add(endpoint);
-                }
-            }
-            if (!removeCandidates.isEmpty())
-            {
-                // we shouldn't be recording a successful repair if
-                // any replicas are excluded from the repair
-                forceSkippedReplicas = true;
-                Set<InetAddressAndPort> filteredEndpoints = new HashSet<>(commonRange.endpoints);
-                filteredEndpoints.removeAll(removeCandidates);
-                commonRange = new CommonRange(filteredEndpoints, commonRange.transEndpoints, commonRange.ranges);
-            }
-        }
-
         this.commonRange = commonRange;
         this.isIncremental = isIncremental;
         this.previewKind = previewKind;
         this.pullRepair = pullRepair;
-        this.skippedReplicas = forceSkippedReplicas;
         this.optimiseStreams = optimiseStreams;
         this.taskExecutor = MoreExecutors.listeningDecorator(createExecutor());
     }
@@ -297,7 +265,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         {
             logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", commonRange));
             Tracing.traceRepair(message);
-            set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
+            set(new RepairSessionResult(id, keyspace, commonRange.ranges, Lists.<RepairResult>newArrayList(), commonRange.hasSkippedReplicas));
             if (!previewKind.isPreview())
             {
                 SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
@@ -308,7 +276,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         // Checking all nodes are live
         for (InetAddressAndPort endpoint : commonRange.endpoints)
         {
-            if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
+            if (!FailureDetector.instance.isAlive(endpoint) && !commonRange.hasSkippedReplicas)
             {
                 message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
                 logger.error("{} {}", previewKind.logPrefix(getId()), message);
@@ -339,7 +307,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                 // this repair session is completed
                 logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully");
                 Tracing.traceRepair("Completed sync of range {}", commonRange);
-                set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, skippedReplicas));
+                set(new RepairSessionResult(id, keyspace, commonRange.ranges, results, commonRange.hasSkippedReplicas));
 
                 taskExecutor.shutdown();
                 // mark this session as terminated
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index f7b0686..2cdc794 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -315,7 +315,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              RepairParallelism parallelismDegree,
                                              boolean isIncremental,
                                              boolean pullRepair,
-                                             boolean force,
                                              PreviewKind previewKind,
                                              boolean optimiseStreams,
                                              ListeningExecutorService executor,
@@ -328,7 +327,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             return null;
 
         final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace,
-                                                        parallelismDegree, isIncremental, pullRepair, force,
+                                                        parallelismDegree, isIncremental, pullRepair,
                                                         previewKind, optimiseStreams, cfnames);
 
         sessions.put(session.getId(), session);
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
index 023d02c..fce67b5 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedRepairUtils.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang3.ArrayUtils;
 import org.junit.Assert;
 
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.ICluster;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.NodeToolResult;
 import org.apache.cassandra.distributed.api.QueryResult;
@@ -45,11 +46,11 @@ public final class DistributedRepairUtils
 
     }
 
-    public static NodeToolResult repair(AbstractCluster<?> cluster, RepairType repairType, boolean withNotifications, String... args) {
+    public static NodeToolResult repair(ICluster<IInvokableInstance> cluster, RepairType repairType, boolean withNotifications, String... args) {
         return repair(cluster, DEFAULT_COORDINATOR, repairType, withNotifications, args);
     }
 
-    public static NodeToolResult repair(AbstractCluster<?> cluster, int node, RepairType repairType, boolean withNotifications, String... args) {
+    public static NodeToolResult repair(ICluster<IInvokableInstance> cluster, int node, RepairType repairType, boolean withNotifications, String... args) {
         args = repairType.append(args);
         args = ArrayUtils.addAll(new String[] { "repair" }, args);
         return cluster.get(node).nodetoolResult(withNotifications, args);
@@ -65,12 +66,12 @@ public final class DistributedRepairUtils
         return cluster.get(node).callOnInstance(() -> StorageMetrics.repairExceptions.getCount());
     }
 
-    public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, String ks, String table)
+    public static QueryResult queryParentRepairHistory(ICluster<IInvokableInstance> cluster, String ks, String table)
     {
         return queryParentRepairHistory(cluster, DEFAULT_COORDINATOR, ks, table);
     }
 
-    public static QueryResult queryParentRepairHistory(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+    public static QueryResult queryParentRepairHistory(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table)
     {
         // This is kinda brittle since the caller never gets the ID and can't ask for the ID; it needs to infer the id
         // this logic makes the assumption the ks/table pairs are unique (should be or else create should fail) so any
@@ -84,35 +85,41 @@ public final class DistributedRepairUtils
         return rs;
     }
 
-    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks, String table)
+    public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, String ks, String table)
     {
         assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks, table);
     }
 
-    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+    public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table)
     {
         QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
         Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
     }
 
-    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, String ks)
+    public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, String ks)
     {
         assertParentRepairNotExist(cluster, DEFAULT_COORDINATOR, ks);
     }
 
-    public static void assertParentRepairNotExist(AbstractCluster<?> cluster, int coordinator, String ks)
+    public static void assertParentRepairNotExist(ICluster<IInvokableInstance> cluster, int coordinator, String ks)
     {
         QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, null);
         Assert.assertFalse("No repairs should be found but at least one found", rs.hasNext());
     }
 
-    public static void assertParentRepairSuccess(AbstractCluster<?> cluster, String ks, String table)
+    public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, String ks, String table)
     {
         assertParentRepairSuccess(cluster, DEFAULT_COORDINATOR, ks, table);
     }
 
-    public static void assertParentRepairSuccess(AbstractCluster<?> cluster, int coordinator, String ks, String table)
+    public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table)
     {
+        assertParentRepairSuccess(cluster, coordinator, ks, table, row -> {});
+    }
+
+    public static void assertParentRepairSuccess(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table, Consumer<Row> moreSuccessCriteria)
+    {
+        Assert.assertNotNull("Invalid null value for moreSuccessCriteria", moreSuccessCriteria);
         QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
         validateExistingParentRepair(rs, row -> {
             // check completed
@@ -121,15 +128,17 @@ public final class DistributedRepairUtils
             // check not failed (aka success)
             Assert.assertNull("Exception found", row.getString("exception_stacktrace"));
             Assert.assertNull("Exception found", row.getString("exception_message"));
+
+            moreSuccessCriteria.accept(row);
         });
     }
 
-    public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, String ks, String table, String message)
+    public static void assertParentRepairFailedWithMessageContains(ICluster<IInvokableInstance> cluster, String ks, String table, String message)
     {
         assertParentRepairFailedWithMessageContains(cluster, DEFAULT_COORDINATOR, ks, table, message);
     }
 
-    public static void assertParentRepairFailedWithMessageContains(AbstractCluster<?> cluster, int coordinator, String ks, String table, String message)
+    public static void assertParentRepairFailedWithMessageContains(ICluster<IInvokableInstance> cluster, int coordinator, String ks, String table, String message)
     {
         QueryResult rs = queryParentRepairHistory(cluster, coordinator, ks, table);
         validateExistingParentRepair(rs, row -> {
diff --git a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
index f37a3d8..b127a74 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/RepairTest.java
@@ -21,11 +21,14 @@ package org.apache.cassandra.distributed.test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -38,27 +41,26 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition;
 import org.apache.cassandra.utils.progress.ProgressEventType;
 
 import static java.util.concurrent.TimeUnit.MINUTES;
-import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
 import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
 import static org.apache.cassandra.distributed.api.Feature.NETWORK;
-import static org.apache.cassandra.distributed.shared.AssertUtils.*;
+import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.apache.cassandra.distributed.test.ExecUtil.rethrow;
 
 public class RepairTest extends TestBaseImpl
 {
-    private static final String insert = withKeyspace("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');");
-    private static final String query = withKeyspace("SELECT k, c1, c2 FROM %s.test WHERE k = ?;");
-
     private static ICluster<IInvokableInstance> cluster;
 
-    private static void insert(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes)
+    private static void insert(ICluster<IInvokableInstance> cluster, String keyspace, int start, int end, int ... nodes)
     {
+        String insert = String.format("INSERT INTO %s.test (k, c1, c2) VALUES (?, 'value1', 'value2');", keyspace);
         for (int i = start ; i < end ; ++i)
             for (int node : nodes)
                 cluster.get(node).executeInternal(insert, Integer.toString(i));
     }
 
-    private static void verify(ICluster<IInvokableInstance> cluster, int start, int end, int ... nodes)
+    private static void verify(ICluster<IInvokableInstance> cluster, String keyspace, int start, int end, int ... nodes)
     {
+        String query = String.format("SELECT k, c1, c2 FROM %s.test WHERE k = ?;", keyspace);
         for (int i = start ; i < end ; ++i)
         {
             for (int node = 1 ; node <= cluster.size() ; ++node)
@@ -72,10 +74,10 @@ public class RepairTest extends TestBaseImpl
         }
     }
 
-    private static void flush(ICluster<IInvokableInstance> cluster, int ... nodes)
+    private static void flush(ICluster<IInvokableInstance> cluster, String keyspace, int ... nodes)
     {
         for (int node : nodes)
-            cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(KEYSPACE)));
+            cluster.get(node).runOnInstance(rethrow(() -> StorageService.instance.forceKeyspaceFlush(keyspace)));
     }
 
     private static ICluster create(Consumer<IInstanceConfig> configModifier) throws IOException
@@ -90,11 +92,11 @@ public class RepairTest extends TestBaseImpl
         return init(Cluster.build().withNodes(3).withConfig(configModifier).start());
     }
 
-    private void repair(ICluster<IInvokableInstance> cluster, Map<String, String> options)
+    static void repair(ICluster<IInvokableInstance> cluster, String keyspace, Map<String, String> options)
     {
         cluster.get(1).runOnInstance(rethrow(() -> {
             SimpleCondition await = new SimpleCondition();
-            StorageService.instance.repair(KEYSPACE, options, ImmutableList.of((tag, event) -> {
+            StorageService.instance.repair(keyspace, options, ImmutableList.of((tag, event) -> {
                 if (event.getType() == ProgressEventType.COMPLETE)
                     await.signalAll();
             })).right.get();
@@ -102,22 +104,22 @@ public class RepairTest extends TestBaseImpl
         }));
     }
 
-    void populate(ICluster<IInvokableInstance> cluster, String compression) throws Exception
+    static void populate(ICluster<IInvokableInstance> cluster, String keyspace, String compression) throws Exception
     {
         try
         {
-            cluster.schemaChange(withKeyspace("DROP TABLE IF EXISTS %s.test;"));
-            cluster.schemaChange(withKeyspace("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = " + compression));
-
-            insert(cluster,    0, 1000, 1, 2, 3);
-            flush(cluster, 1);
-            insert(cluster, 1000, 1001, 1, 2);
-            insert(cluster, 1001, 2001, 1, 2, 3);
-            flush(cluster, 1, 2, 3);
-
-            verify(cluster,    0, 1000, 1, 2, 3);
-            verify(cluster, 1000, 1001, 1, 2);
-            verify(cluster, 1001, 2001, 1, 2, 3);
+            cluster.schemaChange(String.format("DROP TABLE IF EXISTS %s.test;", keyspace));
+            cluster.schemaChange(String.format("CREATE TABLE %s.test (k text, c1 text, c2 text, PRIMARY KEY (k)) WITH compression = %s", keyspace, compression));
+
+            insert(cluster, keyspace, 0, 1000, 1, 2, 3);
+            flush(cluster, keyspace, 1);
+            insert(cluster, keyspace, 1000, 1001, 1, 2);
+            insert(cluster, keyspace, 1001, 2001, 1, 2, 3);
+            flush(cluster, keyspace, 1, 2, 3);
+
+            verify(cluster, keyspace, 0, 1000, 1, 2, 3);
+            verify(cluster, keyspace, 1000, 1001, 1, 2);
+            verify(cluster, keyspace, 1001, 2001, 1, 2, 3);
         }
         catch (Throwable t)
         {
@@ -129,9 +131,16 @@ public class RepairTest extends TestBaseImpl
 
     void repair(ICluster<IInvokableInstance> cluster, boolean sequential, String compression) throws Exception
     {
-        populate(cluster, compression);
-        repair(cluster, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel"));
-        verify(cluster, 0, 2001, 1, 2, 3);
+        populate(cluster, KEYSPACE, compression);
+        repair(cluster, KEYSPACE, ImmutableMap.of("parallelism", sequential ? "sequential" : "parallel"));
+        verify(cluster, KEYSPACE, 0, 2001, 1, 2, 3);
+    }
+
+    void shutDownNodesAndForceRepair(ICluster<IInvokableInstance> cluster, String keyspace, int downNode) throws Exception
+    {
+        populate(cluster, keyspace, "{'enabled': false}");
+        cluster.get(downNode).shutdown().get(5, TimeUnit.SECONDS);
+        repair(cluster, keyspace, ImmutableMap.of("forceRepair", "true"));
     }
 
     @BeforeClass
@@ -182,4 +191,33 @@ public class RepairTest extends TestBaseImpl
     {
         repair(cluster, false, "{'enabled': false}");
     }
+
+    @Test
+    public void testForcedNormalRepairWithOneNodeDown() throws Exception
+    {
+        // The test uses its own keyspace with rf == 2
+        String forceRepairKeyspace = "test_force_repair_keyspace";
+        int rf = 2;
+        cluster.schemaChange("CREATE KEYSPACE " + forceRepairKeyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
+
+        try
+        {
+            shutDownNodesAndForceRepair(cluster, forceRepairKeyspace, 3); // shutdown node 3 after inserting
+            DistributedRepairUtils.assertParentRepairSuccess(cluster, 1, forceRepairKeyspace, "test", row -> {
+                Set<String> successfulRanges = row.getSet("successful_ranges");
+                Set<String> requestedRanges = row.getSet("requested_ranges");
+                Assert.assertNotNull("Found no successful ranges", successfulRanges);
+                Assert.assertNotNull("Found no requested ranges", requestedRanges);
+                Assert.assertEquals("Requested ranges count should equals to replication factor", rf, requestedRanges.size());
+                Assert.assertTrue("Given clusterSize = 3, RF = 2 and 1 node down in the replica set, it should yield only 1 successful repaired range.",
+                                  successfulRanges.size() == 1 && !successfulRanges.contains("")); // the successful ranges set should not only contain empty string
+            });
+        }
+        finally
+        {
+            // bring the node 3 back up
+            if (cluster.get(3).isShutdown())
+                cluster.get(3).startup(cluster);
+        }
+    }
 }
diff --git a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java b/test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java
similarity index 67%
rename from test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
rename to test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java
index 418d7de..9dcd7dc 100644
--- a/test/unit/org/apache/cassandra/repair/RepairRunnableTest.java
+++ b/test/unit/org/apache/cassandra/repair/NeighborsAndRangesTest.java
@@ -29,35 +29,38 @@ import org.junit.Test;
 
 import org.apache.cassandra.locator.InetAddressAndPort;
 
-import static org.apache.cassandra.repair.RepairRunnable.filterCommonRanges;
+import static org.apache.cassandra.repair.RepairRunnable.NeighborsAndRanges;
 
-public class RepairRunnableTest extends AbstractRepairTest
+public class NeighborsAndRangesTest extends AbstractRepairTest
 {
     /**
      * For non-forced repairs, common ranges should be passed through as-is
      */
     @Test
-    public void filterCommonIncrementalRangesNotForced() throws Exception
+    public void filterCommonIncrementalRangesNotForced()
     {
         CommonRange cr = new CommonRange(PARTICIPANTS, Collections.emptySet(), ALL_RANGES);
-
+        NeighborsAndRanges nr = new NeighborsAndRanges(false, PARTICIPANTS, Collections.singletonList(cr));
         List<CommonRange> expected = Lists.newArrayList(cr);
-        List<CommonRange> actual = filterCommonRanges(expected, Collections.emptySet(), false);
+        List<CommonRange> actual = nr.filterCommonRanges(null, null);
 
         Assert.assertEquals(expected, actual);
     }
 
     @Test
-    public void forceFilterCommonIncrementalRanges() throws Exception
+    public void forceFilterCommonIncrementalRanges()
     {
-        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2));
+        CommonRange cr1 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1));
         CommonRange cr2 = new CommonRange(Sets.newHashSet(PARTICIPANT1, PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3));
+        CommonRange cr3 = new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE2));
         Set<InetAddressAndPort> liveEndpoints = Sets.newHashSet(PARTICIPANT2, PARTICIPANT3); // PARTICIPANT1 is excluded
+        List<CommonRange> initial = Lists.newArrayList(cr1, cr2, cr3);
+        List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1), true),
+                                                        new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3), true),
+                                                        new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE2), false));
 
-        List<CommonRange> initial = Lists.newArrayList(cr1, cr2);
-        List<CommonRange> expected = Lists.newArrayList(new CommonRange(Sets.newHashSet(PARTICIPANT2), Collections.emptySet(), Sets.newHashSet(RANGE1, RANGE2)),
-                                                        new CommonRange(Sets.newHashSet(PARTICIPANT2, PARTICIPANT3), Collections.emptySet(), Sets.newHashSet(RANGE3)));
-        List<CommonRange> actual = filterCommonRanges(initial, liveEndpoints, true);
+        NeighborsAndRanges nr = new NeighborsAndRanges(true, liveEndpoints, initial);
+        List<CommonRange> actual = nr.filterCommonRanges(null, null);
 
         Assert.assertEquals(expected, actual);
     }
diff --git a/test/unit/org/apache/cassandra/repair/RepairJobTest.java b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
index d3af58f..9887d38 100644
--- a/test/unit/org/apache/cassandra/repair/RepairJobTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairJobTest.java
@@ -107,9 +107,9 @@ public class RepairJobTest
 
         public MeasureableRepairSession(UUID parentRepairSession, UUID id, CommonRange commonRange, String keyspace,
                                         RepairParallelism parallelismDegree, boolean isIncremental, boolean pullRepair,
-                                        boolean force, PreviewKind previewKind, boolean optimiseStreams, String... cfnames)
+                                        PreviewKind previewKind, boolean optimiseStreams, String... cfnames)
         {
-            super(parentRepairSession, id, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, force, previewKind, optimiseStreams, cfnames);
+            super(parentRepairSession, id, commonRange, keyspace, parallelismDegree, isIncremental, pullRepair, previewKind, optimiseStreams, cfnames);
         }
 
         protected DebuggableThreadPoolExecutor createExecutor()
@@ -168,8 +168,7 @@ public class RepairJobTest
 
         this.session = new MeasureableRepairSession(parentRepairSession, UUIDGen.getTimeUUID(),
                                                     new CommonRange(neighbors, Collections.emptySet(), FULL_RANGE),
-                                                    KEYSPACE, RepairParallelism.SEQUENTIAL,
-                                                    false, false, false,
+                                                    KEYSPACE, RepairParallelism.SEQUENTIAL, false, false,
                                                     PreviewKind.NONE, false, CF);
 
         this.job = new RepairJob(session, CF);
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index e77d657..2ad5831 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -66,7 +66,7 @@ public class RepairSessionTest
         RepairSession session = new RepairSession(parentSessionId, sessionId,
                                                   new CommonRange(endpoints, Collections.emptySet(), Arrays.asList(repairRange)),
                                                   "Keyspace1", RepairParallelism.SEQUENTIAL,
-                                                  false, false, false,
+                                                  false, false,
                                                   PreviewKind.NONE, false, "Standard1");
 
         // perform convict


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org