You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/06/30 18:31:32 UTC

cassandra git commit: Run repair with down replicas

Repository: cassandra
Updated Branches:
  refs/heads/trunk 176f2a444 -> 45c0f860f


Run repair with down replicas

Patch by Sankalp Kohli & Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-10446


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45c0f860
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45c0f860
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45c0f860

Branch: refs/heads/trunk
Commit: 45c0f860f3c7f8e0a7c80809c4ff47f4acf65557
Parents: 176f2a4
Author: Blake Eggleston <bd...@gmail.com>
Authored: Wed Oct 12 10:14:16 2016 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Fri Jun 30 11:31:15 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/repair/RepairRunnable.java | 12 +++++-
 .../apache/cassandra/repair/RepairSession.java  | 39 ++++++++++++++++++--
 .../cassandra/repair/RepairSessionResult.java   | 15 +++++++-
 .../cassandra/repair/messages/RepairOption.java | 25 ++++++++++++-
 .../cassandra/service/ActiveRepairService.java  | 15 ++++++--
 .../apache/cassandra/tools/nodetool/Repair.java |  4 ++
 .../cassandra/repair/RepairSessionTest.java     |  2 +-
 .../consistent/CoordinatorSessionTest.java      |  2 +-
 .../repair/messages/RepairOptionTest.java       | 22 +++++++++++
 10 files changed, 125 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 866c6fd..6444994 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
 4.0
+ * Run repair with down replicas (CASSANDRA-10446)
+ * Added started & completed repair metrics (CASSANDRA-13598)
  * Added started & completed repair metrics (CASSANDRA-13598)
  * Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
  * Improve calculation of available disk space for compaction (CASSANDRA-13068)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index eca162e..29347a4 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -289,9 +289,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                 // filter out null(=failed) results and get successful ranges
                 for (RepairSessionResult sessionResult : results)
                 {
+                    logger.debug("Repair result: {}", results);
                     if (sessionResult != null)
                     {
-                        successfulRanges.addAll(sessionResult.ranges);
+                        // don't promote sstables for sessions we skipped replicas for
+                        if (!sessionResult.skippedReplicas)
+                        {
+                            successfulRanges.addAll(sessionResult.ranges);
+                        }
+                        else
+                        {
+                            logger.debug("Skipping anticompaction for {}", results);
+                        }
                     }
                     else
                     {
@@ -424,6 +433,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                                                      p.left,
                                                                                      isConsistent,
                                                                                      options.isPullRepair(),
+                                                                                     options.isForcedRepair(),
                                                                                      options.getPreviewKind(),
                                                                                      executor,
                                                                                      cfnames);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index c1b3f41..98ed1a3 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.*;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.streaming.SessionSummary;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTrees;
@@ -89,6 +90,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     private final String[] cfnames;
     public final RepairParallelism parallelismDegree;
     public final boolean pullRepair;
+
+    // indicates some replicas were not included in the repair. Only relevant for --force option
+    public final boolean skippedReplicas;
+
     /** Range to repair */
     public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
@@ -116,8 +121,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      * @param keyspace name of keyspace
      * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
      * @param endpoints the data centers that should be part of the repair; null for all DCs
-     * @param repairedAt when the repair occurred (millis)
      * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
+     * @param force true if the repair should ignore dead endpoints (instead of failing)
      * @param cfnames names of columnfamilies
      */
     public RepairSession(UUID parentRepairSession,
@@ -128,6 +133,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                          Set<InetAddress> endpoints,
                          boolean isConsistent,
                          boolean pullRepair,
+                         boolean force,
                          PreviewKind previewKind,
                          String... cfnames)
     {
@@ -139,10 +145,35 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.keyspace = keyspace;
         this.cfnames = cfnames;
         this.ranges = ranges;
+
+        //If force then filter out dead endpoints
+        boolean forceSkippedReplicas = false;
+        if (force)
+        {
+            logger.debug("force flag set, removing dead endpoints");
+            final Set<InetAddress> removeCandidates = new HashSet<>();
+            for (final InetAddress endpoint : endpoints)
+            {
+                if (!FailureDetector.instance.isAlive(endpoint))
+                {
+                    logger.info("Removing a dead node from Repair due to -force " + endpoint);
+                    removeCandidates.add(endpoint);
+                }
+            }
+            if (!removeCandidates.isEmpty())
+            {
+                // we shouldn't be promoting sstables to repaired if any replicas are excluded from the repair
+                forceSkippedReplicas = true;
+                endpoints = new HashSet<>(endpoints);
+                endpoints.removeAll(removeCandidates);
+            }
+        }
+
         this.endpoints = endpoints;
         this.isConsistent = isConsistent;
         this.previewKind = previewKind;
         this.pullRepair = pullRepair;
+        this.skippedReplicas = forceSkippedReplicas;
     }
 
     public UUID getId()
@@ -241,7 +272,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         {
             logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", ranges));
             Tracing.traceRepair(message);
-            set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList()));
+            set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
             if (!previewKind.isPreview())
             {
                 SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
@@ -252,7 +283,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         // Checking all nodes are live
         for (InetAddress endpoint : endpoints)
         {
-            if (!FailureDetector.instance.isAlive(endpoint))
+            if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
             {
                 message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
                 logger.error("{} {}", previewKind.logPrefix(getId()), message);
@@ -283,7 +314,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                 // this repair session is completed
                 logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully");
                 Tracing.traceRepair("Completed sync of range {}", ranges);
-                set(new RepairSessionResult(id, keyspace, ranges, results));
+                set(new RepairSessionResult(id, keyspace, ranges, results, skippedReplicas));
 
                 taskExecutor.shutdown();
                 // mark this session as terminated

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairSessionResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
index d4fff37..491ab2f 100644
--- a/src/java/org/apache/cassandra/repair/RepairSessionResult.java
+++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
@@ -32,12 +32,25 @@ public class RepairSessionResult
     public final String keyspace;
     public final Collection<Range<Token>> ranges;
     public final Collection<RepairResult> repairJobResults;
+    public final boolean skippedReplicas;
 
-    public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults)
+    public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults, boolean skippedReplicas)
     {
         this.sessionId = sessionId;
         this.keyspace = keyspace;
         this.ranges = ranges;
         this.repairJobResults = repairJobResults;
+        this.skippedReplicas = skippedReplicas;
+    }
+
+    public String toString()
+    {
+        return "RepairSessionResult{" +
+               "sessionId=" + sessionId +
+               ", keyspace='" + keyspace + '\'' +
+               ", ranges=" + ranges +
+               ", repairJobResults=" + repairJobResults +
+               ", skippedReplicas=" + skippedReplicas +
+               '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 6d69cf0..a95ee19 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -48,6 +48,7 @@ public class RepairOption
     public static final String TRACE_KEY = "trace";
     public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair";
     public static final String PULL_REPAIR_KEY = "pullRepair";
+    public static final String FORCE_REPAIR_KEY = "forceRepair";
     public static final String PREVIEW = "previewKind";
 
     // we don't want to push nodes too much for repair
@@ -125,6 +126,11 @@ public class RepairOption
      *             This is only allowed if exactly 2 hosts are specified along with a token range that they share.</td>
      *             <td>false</td>
      *         </tr>
+     *         <tr>
+     *             <td>forceRepair</td>
+     *             <td>"true" if the repair should continue, even if one of the replicas involved is down.
+     *             <td>false</td>
+     *         </tr>
      *     </tbody>
      * </table>
      *
@@ -140,6 +146,7 @@ public class RepairOption
         boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
         PreviewKind previewKind = PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString()));
         boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
+        boolean force = Boolean.parseBoolean(options.get(FORCE_REPAIR_KEY));
         boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY));
 
         int jobThreads = 1;
@@ -178,7 +185,7 @@ public class RepairOption
             }
         }
 
-        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, previewKind);
+        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind);
 
         // data centers
         String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -249,6 +256,11 @@ public class RepairOption
             throw new IllegalArgumentException("Incremental repairs cannot be run against a subset of tokens or ranges");
         }
 
+        if (option.isIncremental() && option.isForcedRepair())
+        {
+            throw new IllegalArgumentException("Cannot force incremental repair");
+        }
+
         return option;
     }
 
@@ -259,6 +271,7 @@ public class RepairOption
     private final int jobThreads;
     private final boolean isSubrangeRepair;
     private final boolean pullRepair;
+    private final boolean forceRepair;
     private final PreviewKind previewKind;
 
     private final Collection<String> columnFamilies = new HashSet<>();
@@ -266,7 +279,7 @@ public class RepairOption
     private final Collection<String> hosts = new HashSet<>();
     private final Collection<Range<Token>> ranges = new HashSet<>();
 
-    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, PreviewKind previewKind)
+    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind)
     {
         if (FBUtilities.isWindows &&
             (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -285,6 +298,7 @@ public class RepairOption
         this.ranges.addAll(ranges);
         this.isSubrangeRepair = isSubrangeRepair;
         this.pullRepair = pullRepair;
+        this.forceRepair = forceRepair;
         this.previewKind = previewKind;
     }
 
@@ -313,6 +327,11 @@ public class RepairOption
         return pullRepair;
     }
 
+    public boolean isForcedRepair()
+    {
+        return forceRepair;
+    }
+
     public int getJobThreads()
     {
         return jobThreads;
@@ -376,6 +395,7 @@ public class RepairOption
                ", previewKind: " + previewKind +
                ", # of ranges: " + ranges.size() +
                ", pull repair: " + pullRepair +
+               ", force repair: " + forceRepair +
                ')';
     }
 
@@ -393,6 +413,7 @@ public class RepairOption
         options.put(TRACE_KEY, Boolean.toString(trace));
         options.put(RANGES_KEY, Joiner.on(",").join(ranges));
         options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+        options.put(FORCE_REPAIR_KEY, Boolean.toString(forceRepair));
         options.put(PREVIEW, previewKind.toString());
         return options;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index a397ca2..d50dc3f 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -187,6 +187,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              Set<InetAddress> endpoints,
                                              boolean isConsistent,
                                              boolean pullRepair,
+                                             boolean force,
                                              PreviewKind previewKind,
                                              ListeningExecutorService executor,
                                              String... cfnames)
@@ -197,7 +198,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, previewKind, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, force, previewKind, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners
@@ -389,8 +390,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             }
             else
             {
-                // bailout early to avoid potentially waiting for a long time.
-                failRepair(parentRepairSession, "Endpoint not alive: " + neighbour);
+                if (options.isForcedRepair())
+                {
+                    prepareLatch.countDown();
+                }
+                else
+                {
+                    // bailout early to avoid potentially waiting for a long time.
+                    failRepair(parentRepairSession, "Endpoint not alive: " + neighbour);
+                }
+
             }
         }
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 317a677..77ad214 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -74,6 +74,9 @@ public class Repair extends NodeToolCmd
     @Option(title = "full", name = {"-full", "--full"}, description = "Use -full to issue a full repair.")
     private boolean fullRepair = false;
 
+    @Option(title = "force", name = {"-force", "--force"}, description = "Use -force to filter out down endpoints")
+    private boolean force = false;
+
     @Option(title = "preview", name = {"-prv", "--preview"}, description = "Determine ranges and amount of data to be streamed, but don't actually perform repair")
     private boolean preview = false;
 
@@ -139,6 +142,7 @@ public class Repair extends NodeToolCmd
             options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
             options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ","));
             options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+            options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force));
             options.put(RepairOption.PREVIEW, getPreviewKind().toString());
 
             if (!startToken.isEmpty() || !endToken.isEmpty())

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 5a4e5b1..efae538 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -65,7 +65,7 @@ public class RepairSessionTest
         Set<InetAddress> endpoints = Sets.newHashSet(remote);
         RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
                                                   "Keyspace1", RepairParallelism.SEQUENTIAL,
-                                                  endpoints, false, false,
+                                                  endpoints, false, false, false,
                                                   PreviewKind.NONE, "Standard1");
 
         // perform convict

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index 4f5b7e6..3c27b5e 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -76,7 +76,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
 
     private static RepairSessionResult createResult(CoordinatorSession coordinator)
     {
-        return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null);
+        return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null, false);
     }
 
     private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddress participant, RepairMessage expected)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 9eb7c86..13d7575 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.ImmutableMap;
@@ -161,6 +162,27 @@ public class RepairOptionTest
 
     }
 
+    @Test
+    public void testForceOption() throws Exception
+    {
+        RepairOption option;
+        Map<String, String> options = new HashMap<>();
+
+        // default value
+        option = RepairOption.parse(options, Murmur3Partitioner.instance);
+        Assert.assertFalse(option.isForcedRepair());
+
+        // explicit true
+        options.put(RepairOption.FORCE_REPAIR_KEY, "true");
+        option = RepairOption.parse(options, Murmur3Partitioner.instance);
+        Assert.assertTrue(option.isForcedRepair());
+
+        // explicit false
+        options.put(RepairOption.FORCE_REPAIR_KEY, "false");
+        option = RepairOption.parse(options, Murmur3Partitioner.instance);
+        Assert.assertFalse(option.isForcedRepair());
+    }
+
     private void assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> optionsToParse, String expectedErrorMessage)
     {
         try


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org