You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by bd...@apache.org on 2017/06/30 18:31:32 UTC
cassandra git commit: Run repair with down replicas
Repository: cassandra
Updated Branches:
refs/heads/trunk 176f2a444 -> 45c0f860f
Run repair with down replicas
Patch by Sankalp Kohli & Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-10446
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/45c0f860
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/45c0f860
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/45c0f860
Branch: refs/heads/trunk
Commit: 45c0f860f3c7f8e0a7c80809c4ff47f4acf65557
Parents: 176f2a4
Author: Blake Eggleston <bd...@gmail.com>
Authored: Wed Oct 12 10:14:16 2016 -0700
Committer: Blake Eggleston <bd...@gmail.com>
Committed: Fri Jun 30 11:31:15 2017 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/repair/RepairRunnable.java | 12 +++++-
.../apache/cassandra/repair/RepairSession.java | 39 ++++++++++++++++++--
.../cassandra/repair/RepairSessionResult.java | 15 +++++++-
.../cassandra/repair/messages/RepairOption.java | 25 ++++++++++++-
.../cassandra/service/ActiveRepairService.java | 15 ++++++--
.../apache/cassandra/tools/nodetool/Repair.java | 4 ++
.../cassandra/repair/RepairSessionTest.java | 2 +-
.../consistent/CoordinatorSessionTest.java | 2 +-
.../repair/messages/RepairOptionTest.java | 22 +++++++++++
10 files changed, 125 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 866c6fd..6444994 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,6 @@
4.0
+ * Run repair with down replicas (CASSANDRA-10446)
+ * Added started & completed repair metrics (CASSANDRA-13598)
* Added started & completed repair metrics (CASSANDRA-13598)
* Improve secondary index (re)build failure and concurrency handling (CASSANDRA-10130)
* Improve calculation of available disk space for compaction (CASSANDRA-13068)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index eca162e..29347a4 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -289,9 +289,18 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
// filter out null(=failed) results and get successful ranges
for (RepairSessionResult sessionResult : results)
{
+ logger.debug("Repair result: {}", results);
if (sessionResult != null)
{
- successfulRanges.addAll(sessionResult.ranges);
+ // don't promote sstables for sessions we skipped replicas for
+ if (!sessionResult.skippedReplicas)
+ {
+ successfulRanges.addAll(sessionResult.ranges);
+ }
+ else
+ {
+ logger.debug("Skipping anticompaction for {}", results);
+ }
}
else
{
@@ -424,6 +433,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
p.left,
isConsistent,
options.isPullRepair(),
+ options.isForcedRepair(),
options.getPreviewKind(),
executor,
cfnames);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index c1b3f41..98ed1a3 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.streaming.SessionSummary;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTrees;
@@ -89,6 +90,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
private final String[] cfnames;
public final RepairParallelism parallelismDegree;
public final boolean pullRepair;
+
+ // indicates some replicas were not included in the repair. Only relevant for --force option
+ public final boolean skippedReplicas;
+
/** Range to repair */
public final Collection<Range<Token>> ranges;
public final Set<InetAddress> endpoints;
@@ -116,8 +121,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
* @param keyspace name of keyspace
* @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
* @param endpoints the data centers that should be part of the repair; null for all DCs
- * @param repairedAt when the repair occurred (millis)
* @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
+ * @param force true if the repair should ignore dead endpoints (instead of failing)
* @param cfnames names of columnfamilies
*/
public RepairSession(UUID parentRepairSession,
@@ -128,6 +133,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
Set<InetAddress> endpoints,
boolean isConsistent,
boolean pullRepair,
+ boolean force,
PreviewKind previewKind,
String... cfnames)
{
@@ -139,10 +145,35 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.keyspace = keyspace;
this.cfnames = cfnames;
this.ranges = ranges;
+
+ //If force then filter out dead endpoints
+ boolean forceSkippedReplicas = false;
+ if (force)
+ {
+ logger.debug("force flag set, removing dead endpoints");
+ final Set<InetAddress> removeCandidates = new HashSet<>();
+ for (final InetAddress endpoint : endpoints)
+ {
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ logger.info("Removing a dead node from Repair due to -force " + endpoint);
+ removeCandidates.add(endpoint);
+ }
+ }
+ if (!removeCandidates.isEmpty())
+ {
+ // we shouldn't be promoting sstables to repaired if any replicas are excluded from the repair
+ forceSkippedReplicas = true;
+ endpoints = new HashSet<>(endpoints);
+ endpoints.removeAll(removeCandidates);
+ }
+ }
+
this.endpoints = endpoints;
this.isConsistent = isConsistent;
this.previewKind = previewKind;
this.pullRepair = pullRepair;
+ this.skippedReplicas = forceSkippedReplicas;
}
public UUID getId()
@@ -241,7 +272,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
{
logger.info("{} {}", previewKind.logPrefix(getId()), message = String.format("No neighbors to repair with on range %s: session completed", ranges));
Tracing.traceRepair(message);
- set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList()));
+ set(new RepairSessionResult(id, keyspace, ranges, Lists.<RepairResult>newArrayList(), skippedReplicas));
if (!previewKind.isPreview())
{
SystemDistributedKeyspace.failRepairs(getId(), keyspace, cfnames, new RuntimeException(message));
@@ -252,7 +283,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// Checking all nodes are live
for (InetAddress endpoint : endpoints)
{
- if (!FailureDetector.instance.isAlive(endpoint))
+ if (!FailureDetector.instance.isAlive(endpoint) && !skippedReplicas)
{
message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
logger.error("{} {}", previewKind.logPrefix(getId()), message);
@@ -283,7 +314,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
// this repair session is completed
logger.info("{} {}", previewKind.logPrefix(getId()), "Session completed successfully");
Tracing.traceRepair("Completed sync of range {}", ranges);
- set(new RepairSessionResult(id, keyspace, ranges, results));
+ set(new RepairSessionResult(id, keyspace, ranges, results, skippedReplicas));
taskExecutor.shutdown();
// mark this session as terminated
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/RepairSessionResult.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSessionResult.java b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
index d4fff37..491ab2f 100644
--- a/src/java/org/apache/cassandra/repair/RepairSessionResult.java
+++ b/src/java/org/apache/cassandra/repair/RepairSessionResult.java
@@ -32,12 +32,25 @@ public class RepairSessionResult
public final String keyspace;
public final Collection<Range<Token>> ranges;
public final Collection<RepairResult> repairJobResults;
+ public final boolean skippedReplicas;
- public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults)
+ public RepairSessionResult(UUID sessionId, String keyspace, Collection<Range<Token>> ranges, Collection<RepairResult> repairJobResults, boolean skippedReplicas)
{
this.sessionId = sessionId;
this.keyspace = keyspace;
this.ranges = ranges;
this.repairJobResults = repairJobResults;
+ this.skippedReplicas = skippedReplicas;
+ }
+
+ public String toString()
+ {
+ return "RepairSessionResult{" +
+ "sessionId=" + sessionId +
+ ", keyspace='" + keyspace + '\'' +
+ ", ranges=" + ranges +
+ ", repairJobResults=" + repairJobResults +
+ ", skippedReplicas=" + skippedReplicas +
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 6d69cf0..a95ee19 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -48,6 +48,7 @@ public class RepairOption
public static final String TRACE_KEY = "trace";
public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair";
public static final String PULL_REPAIR_KEY = "pullRepair";
+ public static final String FORCE_REPAIR_KEY = "forceRepair";
public static final String PREVIEW = "previewKind";
// we don't want to push nodes too much for repair
@@ -125,6 +126,11 @@ public class RepairOption
* This is only allowed if exactly 2 hosts are specified along with a token range that they share.</td>
* <td>false</td>
* </tr>
+ * <tr>
+ * <td>forceRepair</td>
+ * <td>"true" if the repair should continue, even if one of the replicas involved is down.
+ * <td>false</td>
+ * </tr>
* </tbody>
* </table>
*
@@ -140,6 +146,7 @@ public class RepairOption
boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
PreviewKind previewKind = PreviewKind.valueOf(options.getOrDefault(PREVIEW, PreviewKind.NONE.toString()));
boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
+ boolean force = Boolean.parseBoolean(options.get(FORCE_REPAIR_KEY));
boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY));
int jobThreads = 1;
@@ -178,7 +185,7 @@ public class RepairOption
}
}
- RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, previewKind);
+ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair, force, previewKind);
// data centers
String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -249,6 +256,11 @@ public class RepairOption
throw new IllegalArgumentException("Incremental repairs cannot be run against a subset of tokens or ranges");
}
+ if (option.isIncremental() && option.isForcedRepair())
+ {
+ throw new IllegalArgumentException("Cannot force incremental repair");
+ }
+
return option;
}
@@ -259,6 +271,7 @@ public class RepairOption
private final int jobThreads;
private final boolean isSubrangeRepair;
private final boolean pullRepair;
+ private final boolean forceRepair;
private final PreviewKind previewKind;
private final Collection<String> columnFamilies = new HashSet<>();
@@ -266,7 +279,7 @@ public class RepairOption
private final Collection<String> hosts = new HashSet<>();
private final Collection<Range<Token>> ranges = new HashSet<>();
- public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, PreviewKind previewKind)
+ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair, boolean forceRepair, PreviewKind previewKind)
{
if (FBUtilities.isWindows &&
(DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -285,6 +298,7 @@ public class RepairOption
this.ranges.addAll(ranges);
this.isSubrangeRepair = isSubrangeRepair;
this.pullRepair = pullRepair;
+ this.forceRepair = forceRepair;
this.previewKind = previewKind;
}
@@ -313,6 +327,11 @@ public class RepairOption
return pullRepair;
}
+ public boolean isForcedRepair()
+ {
+ return forceRepair;
+ }
+
public int getJobThreads()
{
return jobThreads;
@@ -376,6 +395,7 @@ public class RepairOption
", previewKind: " + previewKind +
", # of ranges: " + ranges.size() +
", pull repair: " + pullRepair +
+ ", force repair: " + forceRepair +
')';
}
@@ -393,6 +413,7 @@ public class RepairOption
options.put(TRACE_KEY, Boolean.toString(trace));
options.put(RANGES_KEY, Joiner.on(",").join(ranges));
options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+ options.put(FORCE_REPAIR_KEY, Boolean.toString(forceRepair));
options.put(PREVIEW, previewKind.toString());
return options;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index a397ca2..d50dc3f 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -187,6 +187,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
Set<InetAddress> endpoints,
boolean isConsistent,
boolean pullRepair,
+ boolean force,
PreviewKind previewKind,
ListeningExecutorService executor,
String... cfnames)
@@ -197,7 +198,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
if (cfnames.length == 0)
return null;
- final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, previewKind, cfnames);
+ final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, isConsistent, pullRepair, force, previewKind, cfnames);
sessions.put(session.getId(), session);
// register listeners
@@ -389,8 +390,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
}
else
{
- // bailout early to avoid potentially waiting for a long time.
- failRepair(parentRepairSession, "Endpoint not alive: " + neighbour);
+ if (options.isForcedRepair())
+ {
+ prepareLatch.countDown();
+ }
+ else
+ {
+ // bailout early to avoid potentially waiting for a long time.
+ failRepair(parentRepairSession, "Endpoint not alive: " + neighbour);
+ }
+
}
}
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 317a677..77ad214 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -74,6 +74,9 @@ public class Repair extends NodeToolCmd
@Option(title = "full", name = {"-full", "--full"}, description = "Use -full to issue a full repair.")
private boolean fullRepair = false;
+ @Option(title = "force", name = {"-force", "--force"}, description = "Use -force to filter out down endpoints")
+ private boolean force = false;
+
@Option(title = "preview", name = {"-prv", "--preview"}, description = "Determine ranges and amount of data to be streamed, but don't actually perform repair")
private boolean preview = false;
@@ -139,6 +142,7 @@ public class Repair extends NodeToolCmd
options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ","));
options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
+ options.put(RepairOption.FORCE_REPAIR_KEY, Boolean.toString(force));
options.put(RepairOption.PREVIEW, getPreviewKind().toString());
if (!startToken.isEmpty() || !endToken.isEmpty())
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index 5a4e5b1..efae538 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -65,7 +65,7 @@ public class RepairSessionTest
Set<InetAddress> endpoints = Sets.newHashSet(remote);
RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange),
"Keyspace1", RepairParallelism.SEQUENTIAL,
- endpoints, false, false,
+ endpoints, false, false, false,
PreviewKind.NONE, "Standard1");
// perform convict
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index 4f5b7e6..3c27b5e 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -76,7 +76,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest
private static RepairSessionResult createResult(CoordinatorSession coordinator)
{
- return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null);
+ return new RepairSessionResult(coordinator.sessionID, "ks", coordinator.ranges, null, false);
}
private static void assertMessageSent(InstrumentedCoordinatorSession coordinator, InetAddress participant, RepairMessage expected)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/45c0f860/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 9eb7c86..13d7575 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.junit.Assert;
import org.junit.Test;
import com.google.common.collect.ImmutableMap;
@@ -161,6 +162,27 @@ public class RepairOptionTest
}
+ @Test
+ public void testForceOption() throws Exception
+ {
+ RepairOption option;
+ Map<String, String> options = new HashMap<>();
+
+ // default value
+ option = RepairOption.parse(options, Murmur3Partitioner.instance);
+ Assert.assertFalse(option.isForcedRepair());
+
+ // explicit true
+ options.put(RepairOption.FORCE_REPAIR_KEY, "true");
+ option = RepairOption.parse(options, Murmur3Partitioner.instance);
+ Assert.assertTrue(option.isForcedRepair());
+
+ // explicit false
+ options.put(RepairOption.FORCE_REPAIR_KEY, "false");
+ option = RepairOption.parse(options, Murmur3Partitioner.instance);
+ Assert.assertFalse(option.isForcedRepair());
+ }
+
private void assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> optionsToParse, String expectedErrorMessage)
{
try
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org