You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/08/11 19:58:32 UTC
cassandra git commit: Add support to one way targeted repair
(pull-repair)
Repository: cassandra
Updated Branches:
refs/heads/trunk a3e772b8b -> b29736c27
Add support to one way targeted repair (pull-repair)
Patch by Geoffrey Yu; Reviewed by Paulo Motta for CASSANDRA-9876
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b29736c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b29736c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b29736c2
Branch: refs/heads/trunk
Commit: b29736c27acff3a62a1416a7c6cd7f77deb96b84
Parents: a3e772b
Author: Geoffrey Yu <ge...@apple.com>
Authored: Thu Aug 4 13:40:14 2016 -0700
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 11 14:58:18 2016 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/repair/LocalSyncTask.java | 19 +++++---
.../org/apache/cassandra/repair/RepairJob.java | 2 +-
.../apache/cassandra/repair/RepairRunnable.java | 3 +-
.../apache/cassandra/repair/RepairSession.java | 4 ++
.../cassandra/repair/messages/RepairOption.java | 36 ++++++++++++++-
.../cassandra/service/ActiveRepairService.java | 10 +++--
.../cassandra/service/StorageService.java | 4 +-
.../apache/cassandra/tools/nodetool/Repair.java | 4 ++
.../cassandra/repair/LocalSyncTaskTest.java | 4 +-
.../cassandra/repair/RepairSessionTest.java | 2 +-
.../repair/messages/RepairOptionTest.java | 46 +++++++++++++++++++-
12 files changed, 114 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1b0a44..bba64c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.10
+ * Add support to one-way targeted repair (CASSANDRA-9876)
* Remove clientutil jar (CASSANDRA-11635)
* Fix compaction throughput throttle (CASSANDRA-12366)
* Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index a92708f..cfc181e 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -47,10 +47,13 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
private final long repairedAt;
- public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt)
+ private final boolean pullRepair;
+
+ public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, boolean pullRepair)
{
super(desc, r1, r2);
this.repairedAt = repairedAt;
+ this.pullRepair = pullRepair;
}
/**
@@ -73,13 +76,17 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
isIncremental = prs.isIncremental;
}
Tracing.traceRepair(message);
- new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
+ StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
.flushBeforeTransfer(true)
// request ranges from the remote node
- .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)
- // send ranges to the remote node
- .transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)
- .execute();
+ .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);
+ if (!pullRepair)
+ {
+ // send ranges to the remote node if we are not performing a pull repair
+ plan.transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);
+ }
+
+ plan.execute();
}
public void handleStreamEvent(StreamEvent event)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 454865b..c768db6 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -118,7 +118,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
SyncTask task;
if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
{
- task = new LocalSyncTask(desc, r1, r2, repairedAt);
+ task = new LocalSyncTask(desc, r1, r2, repairedAt, session.pullRepair);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index d099f72..b69d8ce 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -103,7 +103,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
{
fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
- fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
+ fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, String.format("Repair command #%d finished with error", cmd)));
}
protected void runMayThrow() throws Exception
@@ -226,6 +226,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
options.getParallelism(),
p.left,
repairedAt,
+ options.isPullRepair(),
executor,
cfnames);
if (session == null)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352..cad506d 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -87,6 +87,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final String keyspace;
private final String[] cfnames;
public final RepairParallelism parallelismDegree;
+ public final boolean pullRepair;
/** Range to repair */
public final Collection<Range<Token>> ranges;
public final Set<InetAddress> endpoints;
@@ -117,6 +118,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
* @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
* @param endpoints the data centers that should be part of the repair; null for all DCs
* @param repairedAt when the repair occurred (millis)
+ * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
* @param cfnames names of columnfamilies
*/
public RepairSession(UUID parentRepairSession,
@@ -126,6 +128,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
RepairParallelism parallelismDegree,
Set<InetAddress> endpoints,
long repairedAt,
+ boolean pullRepair,
String... cfnames)
{
assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@ -139,6 +142,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.endpoints = endpoints;
this.repairedAt = repairedAt;
this.validationRemaining = new AtomicInteger(cfnames.length);
+ this.pullRepair = pullRepair;
}
public UUID getId()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 82dd181..1f34973 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -46,6 +46,7 @@ public class RepairOption
public static final String HOSTS_KEY = "hosts";
public static final String TRACE_KEY = "trace";
public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair";
+ public static final String PULL_REPAIR_KEY = "pullRepair";
// we don't want to push nodes too much for repair
public static final int MAX_JOB_THREADS = 4;
@@ -116,6 +117,12 @@ public class RepairOption
* Multiple hosts can be given as comma separated values(e.g. cass1,cass2).</td>
* <td></td>
* </tr>
+ * <tr>
+ * <td>pullRepair</td>
+ * <td>"true" if the repair should only stream data one way from a remote host to this host.
+ * This is only allowed if exactly 2 hosts are specified along with a token range that they share.</td>
+ * <td>false</td>
+ * </tr>
* </tbody>
* </table>
*
@@ -130,6 +137,7 @@ public class RepairOption
boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
+ boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY));
int jobThreads = 1;
if (options.containsKey(JOB_THREADS_KEY))
@@ -163,7 +171,7 @@ public class RepairOption
}
}
- RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty());
+ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair);
// data centers
String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -209,10 +217,25 @@ public class RepairOption
{
throw new IllegalArgumentException("Too many job threads. Max is " + MAX_JOB_THREADS);
}
+ if (!dataCenters.isEmpty() && !hosts.isEmpty())
+ {
+ throw new IllegalArgumentException("Cannot combine -dc and -hosts options.");
+ }
if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty()))
{
throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
}
+ if (pullRepair)
+ {
+ if (hosts.size() != 2)
+ {
+ throw new IllegalArgumentException("Pull repair can only be performed between two hosts. Please specify two hosts, one of which must be this host.");
+ }
+ else if (ranges.isEmpty())
+ {
+ throw new IllegalArgumentException("Token ranges must be specified when performing pull repair. Please specify at least one token range which both hosts have in common.");
+ }
+ }
return option;
}
@@ -223,13 +246,14 @@ public class RepairOption
private final boolean trace;
private final int jobThreads;
private final boolean isSubrangeRepair;
+ private final boolean pullRepair;
private final Collection<String> columnFamilies = new HashSet<>();
private final Collection<String> dataCenters = new HashSet<>();
private final Collection<String> hosts = new HashSet<>();
private final Collection<Range<Token>> ranges = new HashSet<>();
- public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair)
+ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair)
{
if (FBUtilities.isWindows &&
(DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -247,6 +271,7 @@ public class RepairOption
this.jobThreads = jobThreads;
this.ranges.addAll(ranges);
this.isSubrangeRepair = isSubrangeRepair;
+ this.pullRepair = pullRepair;
}
public RepairParallelism getParallelism()
@@ -269,6 +294,11 @@ public class RepairOption
return trace;
}
+ public boolean isPullRepair()
+ {
+ return pullRepair;
+ }
+
public int getJobThreads()
{
return jobThreads;
@@ -316,6 +346,7 @@ public class RepairOption
", dataCenters: " + dataCenters +
", hosts: " + hosts +
", # of ranges: " + ranges.size() +
+ ", pull repair: " + pullRepair +
')';
}
@@ -332,6 +363,7 @@ public class RepairOption
options.put(SUB_RANGE_REPAIR_KEY, Boolean.toString(isSubrangeRepair));
options.put(TRACE_KEY, Boolean.toString(trace));
options.put(RANGES_KEY, Joiner.on(",").join(ranges));
+ options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
return options;
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 435c7c8..4699ae1 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -132,6 +132,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
RepairParallelism parallelismDegree,
Set<InetAddress> endpoints,
long repairedAt,
+ boolean pullRepair,
ListeningExecutorService executor,
String... cfnames)
{
@@ -141,7 +142,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
if (cfnames.length == 0)
return null;
- final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, cfnames);
+ final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames);
sessions.put(session.getId(), session);
// register listeners
@@ -245,9 +246,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
if (specifiedHost.size() <= 1)
{
- String msg = "Repair requires at least two endpoints that are neighbours before it can continue, the endpoint used for this repair is %s, " +
- "other available neighbours are %s but these neighbours were not part of the supplied list of hosts to use during the repair (%s).";
- throw new IllegalArgumentException(String.format(msg, specifiedHost, neighbors, hosts));
+ String msg = "Specified hosts %s do not share range %s needed for repair. Either restrict repair ranges " +
+ "with -st/-et options, or specify one of the neighbors that share this range with " +
+ "this node: %s.";
+ throw new IllegalArgumentException(String.format(msg, hosts, toRepair, neighbors));
}
specifiedHost.remove(FBUtilities.getBroadcastAddress());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e3b4752..2810e2f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3172,7 +3172,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
parallelism = RepairParallelism.PARALLEL;
}
- RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList(), false);
+ RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList(), false, false);
if (dataCenters != null)
{
options.getDataCenters().addAll(dataCenters);
@@ -3264,7 +3264,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
"The repair will occur but without anti-compaction.");
Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
- RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange, true);
+ RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange, true, false);
if (dataCenters != null)
{
options.getDataCenters().addAll(dataCenters);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 02bfc5b..5383fa5 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -81,6 +81,9 @@ public class Repair extends NodeToolCmd
@Option(title = "trace_repair", name = {"-tr", "--trace"}, description = "Use -tr to trace the repair. Traces are logged to system_traces.events.")
private boolean trace = false;
+ @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.")
+ private boolean pullRepair = false;
+
@Override
public void execute(NodeProbe probe)
{
@@ -108,6 +111,7 @@ public class Repair extends NodeToolCmd
options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(numJobThreads));
options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ","));
+ options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
if (!startToken.isEmpty() || !endToken.isEmpty())
{
options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 6aacae6..0fceaf4 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -76,7 +76,7 @@ public class LocalSyncTaskTest extends SchemaLoader
// note: we reuse the same endpoint which is bogus in theory but fine here
TreeResponse r1 = new TreeResponse(ep1, tree1);
TreeResponse r2 = new TreeResponse(ep2, tree2);
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE);
+ LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false);
task.run();
assertEquals(0, task.get().numberOfDifferences);
@@ -111,7 +111,7 @@ public class LocalSyncTaskTest extends SchemaLoader
// note: we reuse the same endpoint which is bogus in theory but fine here
TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
- LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE);
+ LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false);
task.run();
// ensure that the changed range was recorded
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index d40982c..7b31c26 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -54,7 +54,7 @@ public class RepairSessionTest
IPartitioner p = Murmur3Partitioner.instance;
Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
Set<InetAddress> endpoints = Sets.newHashSet(remote);
- RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1");
+ RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, "Standard1");
// perform convict
session.convict(remote, Double.MAX_VALUE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index a564cff..665a0b7 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -38,7 +38,10 @@ import org.apache.cassandra.utils.FBUtilities;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.matchers.JUnitMatchers.containsString;
public class RepairOptionTest
{
@@ -59,7 +62,7 @@ public class RepairOptionTest
assertFalse(option.isPrimaryRange());
assertFalse(option.isIncremental());
- // parse everything
+ // parse everything except hosts (hosts cannot be combined with data centers)
Map<String, String> options = new HashMap<>();
options.put(RepairOption.PARALLELISM_KEY, "parallel");
options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
@@ -67,7 +70,6 @@ public class RepairOptionTest
options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30");
options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3");
- options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
option = RepairOption.parse(options, partitioner);
assertTrue(option.getParallelism() == RepairParallelism.PARALLEL);
@@ -92,6 +94,14 @@ public class RepairOptionTest
expectedDCs.add("dc3");
assertEquals(expectedDCs, option.getDataCenters());
+ // expect an error when parsing with hosts as well
+ options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
+ assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Cannot combine -dc and -hosts options");
+
+ // remove data centers to proceed with testing parsing hosts
+ options.remove(RepairOption.DATACENTERS_KEY);
+ option = RepairOption.parse(options, partitioner);
+
Set<String> expectedHosts = new HashSet<>(3);
expectedHosts.add("127.0.0.1");
expectedHosts.add("127.0.0.2");
@@ -100,6 +110,25 @@ public class RepairOptionTest
}
@Test
+ public void testPullRepairParseOptions()
+ {
+ Map<String, String> options = new HashMap<>();
+
+ options.put(RepairOption.PULL_REPAIR_KEY, "true");
+ assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Pull repair can only be performed between two hosts");
+
+ options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
+ assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Pull repair can only be performed between two hosts");
+
+ options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2");
+ assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Token ranges must be specified when performing pull repair");
+
+ options.put(RepairOption.RANGES_KEY, "0:10");
+ RepairOption option = RepairOption.parse(options, Murmur3Partitioner.instance);
+ assertTrue(option.isPullRepair());
+ }
+
+ @Test
public void testIncrementalRepairWithSubrangesIsNotGlobal() throws Exception
{
RepairOption ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, "42:42"),
@@ -109,4 +138,17 @@ public class RepairOptionTest
Murmur3Partitioner.instance);
assertTrue(ro.isGlobal());
}
+
+ private void assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> optionsToParse, String expectedErrorMessage)
+ {
+ try
+ {
+ RepairOption.parse(optionsToParse, Murmur3Partitioner.instance);
+ fail(String.format("Expected RepairOption.parse() to throw an IllegalArgumentException containing the message '%s'", expectedErrorMessage));
+ }
+ catch (IllegalArgumentException ex)
+ {
+ assertThat(ex.getMessage(), containsString(expectedErrorMessage));
+ }
+ }
}