You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/08/11 19:58:32 UTC

cassandra git commit: Add support to one way targeted repair (pull-repair)

Repository: cassandra
Updated Branches:
  refs/heads/trunk a3e772b8b -> b29736c27


Add support to one way targeted repair (pull-repair)

Patch by Geoffrey Yu; Reviewed by Paulo Motta for CASSANDRA-9876


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b29736c2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b29736c2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b29736c2

Branch: refs/heads/trunk
Commit: b29736c27acff3a62a1416a7c6cd7f77deb96b84
Parents: a3e772b
Author: Geoffrey Yu <ge...@apple.com>
Authored: Thu Aug 4 13:40:14 2016 -0700
Committer: Yuki Morishita <yu...@apache.org>
Committed: Thu Aug 11 14:58:18 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/repair/LocalSyncTask.java  | 19 +++++---
 .../org/apache/cassandra/repair/RepairJob.java  |  2 +-
 .../apache/cassandra/repair/RepairRunnable.java |  3 +-
 .../apache/cassandra/repair/RepairSession.java  |  4 ++
 .../cassandra/repair/messages/RepairOption.java | 36 ++++++++++++++-
 .../cassandra/service/ActiveRepairService.java  | 10 +++--
 .../cassandra/service/StorageService.java       |  4 +-
 .../apache/cassandra/tools/nodetool/Repair.java |  4 ++
 .../cassandra/repair/LocalSyncTaskTest.java     |  4 +-
 .../cassandra/repair/RepairSessionTest.java     |  2 +-
 .../repair/messages/RepairOptionTest.java       | 46 +++++++++++++++++++-
 12 files changed, 114 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e1b0a44..bba64c6 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Add support to one-way targeted repair (CASSANDRA-9876)
  * Remove clientutil jar (CASSANDRA-11635)
  * Fix compaction throughput throttle (CASSANDRA-12366)
  * Delay releasing Memtable memory on flush until PostFlush has finished running (CASSANDRA-12358)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index a92708f..cfc181e 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -47,10 +47,13 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
 
     private final long repairedAt;
 
-    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt)
+    private final boolean pullRepair;
+
+    public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt, boolean pullRepair)
     {
         super(desc, r1, r2);
         this.repairedAt = repairedAt;
+        this.pullRepair = pullRepair;
     }
 
     /**
@@ -73,13 +76,17 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
             isIncremental = prs.isIncremental;
         }
         Tracing.traceRepair(message);
-        new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
+        StreamPlan plan = new StreamPlan("Repair", repairedAt, 1, false, isIncremental, false).listeners(this)
                                             .flushBeforeTransfer(true)
                                             // request ranges from the remote node
-                                            .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)
-                                            // send ranges to the remote node
-                                            .transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily)
-                                            .execute();
+                                            .requestRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);
+        if (!pullRepair)
+        {
+            // send ranges to the remote node if we are not performing a pull repair
+            plan.transferRanges(dst, preferred, desc.keyspace, differences, desc.columnFamily);
+        }
+
+        plan.execute();
     }
 
     public void handleStreamEvent(StreamEvent event)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 454865b..c768db6 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -118,7 +118,7 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
                         SyncTask task;
                         if (r1.endpoint.equals(local) || r2.endpoint.equals(local))
                         {
-                            task = new LocalSyncTask(desc, r1, r2, repairedAt);
+                            task = new LocalSyncTask(desc, r1, r2, repairedAt, session.pullRepair);
                         }
                         else
                         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index d099f72..b69d8ce 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -103,7 +103,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
     protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message)
     {
         fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message));
-        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress));
+        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress, String.format("Repair command #%d finished with error", cmd)));
     }
 
     protected void runMayThrow() throws Exception
@@ -226,6 +226,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                                                               options.getParallelism(),
                                                               p.left,
                                                               repairedAt,
+                                                              options.isPullRepair(),
                                                               executor,
                                                               cfnames);
             if (session == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a52b352..cad506d 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -87,6 +87,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final String keyspace;
     private final String[] cfnames;
     public final RepairParallelism parallelismDegree;
+    public final boolean pullRepair;
     /** Range to repair */
     public final Collection<Range<Token>> ranges;
     public final Set<InetAddress> endpoints;
@@ -117,6 +118,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
      * @param parallelismDegree specifies the degree of parallelism when calculating the merkle trees
      * @param endpoints the data centers that should be part of the repair; null for all DCs
      * @param repairedAt when the repair occurred (millis)
+     * @param pullRepair true if the repair should be one way (from remote host to this host and only applicable between two hosts--see RepairOption)
      * @param cfnames names of columnfamilies
      */
     public RepairSession(UUID parentRepairSession,
@@ -126,6 +128,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
                          RepairParallelism parallelismDegree,
                          Set<InetAddress> endpoints,
                          long repairedAt,
+                         boolean pullRepair,
                          String... cfnames)
     {
         assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it";
@@ -139,6 +142,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
         this.validationRemaining = new AtomicInteger(cfnames.length);
+        this.pullRepair = pullRepair;
     }
 
     public UUID getId()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index 82dd181..1f34973 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -46,6 +46,7 @@ public class RepairOption
     public static final String HOSTS_KEY = "hosts";
     public static final String TRACE_KEY = "trace";
     public static final String SUB_RANGE_REPAIR_KEY = "sub_range_repair";
+    public static final String PULL_REPAIR_KEY = "pullRepair";
 
     // we don't want to push nodes too much for repair
     public static final int MAX_JOB_THREADS = 4;
@@ -116,6 +117,12 @@ public class RepairOption
      *             Multiple hosts can be given as comma separated values(e.g. cass1,cass2).</td>
      *             <td></td>
      *         </tr>
+     *         <tr>
+     *             <td>pullRepair</td>
+     *             <td>"true" if the repair should only stream data one way from a remote host to this host.
+     *             This is only allowed if exactly 2 hosts are specified along with a token range that they share.</td>
+     *             <td>false</td>
+     *         </tr>
      *     </tbody>
      * </table>
      *
@@ -130,6 +137,7 @@ public class RepairOption
         boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
         boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
         boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
+        boolean pullRepair = Boolean.parseBoolean(options.get(PULL_REPAIR_KEY));
 
         int jobThreads = 1;
         if (options.containsKey(JOB_THREADS_KEY))
@@ -163,7 +171,7 @@ public class RepairOption
             }
         }
 
-        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty());
+        RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges, !ranges.isEmpty(), pullRepair);
 
         // data centers
         String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -209,10 +217,25 @@ public class RepairOption
         {
             throw new IllegalArgumentException("Too many job threads. Max is " + MAX_JOB_THREADS);
         }
+        if (!dataCenters.isEmpty() && !hosts.isEmpty())
+        {
+            throw new IllegalArgumentException("Cannot combine -dc and -hosts options.");
+        }
         if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty()))
         {
             throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster.");
         }
+        if (pullRepair)
+        {
+            if (hosts.size() != 2)
+            {
+                throw new IllegalArgumentException("Pull repair can only be performed between two hosts. Please specify two hosts, one of which must be this host.");
+            }
+            else if (ranges.isEmpty())
+            {
+                throw new IllegalArgumentException("Token ranges must be specified when performing pull repair. Please specify at least one token range which both hosts have in common.");
+            }
+        }
 
         return option;
     }
@@ -223,13 +246,14 @@ public class RepairOption
     private final boolean trace;
     private final int jobThreads;
     private final boolean isSubrangeRepair;
+    private final boolean pullRepair;
 
     private final Collection<String> columnFamilies = new HashSet<>();
     private final Collection<String> dataCenters = new HashSet<>();
     private final Collection<String> hosts = new HashSet<>();
     private final Collection<Range<Token>> ranges = new HashSet<>();
 
-    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair)
+    public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges, boolean isSubrangeRepair, boolean pullRepair)
     {
         if (FBUtilities.isWindows &&
             (DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard) &&
@@ -247,6 +271,7 @@ public class RepairOption
         this.jobThreads = jobThreads;
         this.ranges.addAll(ranges);
         this.isSubrangeRepair = isSubrangeRepair;
+        this.pullRepair = pullRepair;
     }
 
     public RepairParallelism getParallelism()
@@ -269,6 +294,11 @@ public class RepairOption
         return trace;
     }
 
+    public boolean isPullRepair()
+    {
+        return pullRepair;
+    }
+
     public int getJobThreads()
     {
         return jobThreads;
@@ -316,6 +346,7 @@ public class RepairOption
                        ", dataCenters: " + dataCenters +
                        ", hosts: " + hosts +
                        ", # of ranges: " + ranges.size() +
+                       ", pull repair: " + pullRepair +
                        ')';
     }
 
@@ -332,6 +363,7 @@ public class RepairOption
         options.put(SUB_RANGE_REPAIR_KEY, Boolean.toString(isSubrangeRepair));
         options.put(TRACE_KEY, Boolean.toString(trace));
         options.put(RANGES_KEY, Joiner.on(",").join(ranges));
+        options.put(PULL_REPAIR_KEY, Boolean.toString(pullRepair));
         return options;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 435c7c8..4699ae1 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -132,6 +132,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                                              RepairParallelism parallelismDegree,
                                              Set<InetAddress> endpoints,
                                              long repairedAt,
+                                             boolean pullRepair,
                                              ListeningExecutorService executor,
                                              String... cfnames)
     {
@@ -141,7 +142,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         if (cfnames.length == 0)
             return null;
 
-        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, cfnames);
+        final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, parallelismDegree, endpoints, repairedAt, pullRepair, cfnames);
 
         sessions.put(session.getId(), session);
         // register listeners
@@ -245,9 +246,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
             if (specifiedHost.size() <= 1)
             {
-                String msg = "Repair requires at least two endpoints that are neighbours before it can continue, the endpoint used for this repair is %s, " +
-                             "other available neighbours are %s but these neighbours were not part of the supplied list of hosts to use during the repair (%s).";
-                throw new IllegalArgumentException(String.format(msg, specifiedHost, neighbors, hosts));
+                String msg = "Specified hosts %s do not share range %s needed for repair. Either restrict repair ranges " +
+                             "with -st/-et options, or specify one of the neighbors that share this range with " +
+                             "this node: %s.";
+                throw new IllegalArgumentException(String.format(msg, hosts, toRepair, neighbors));
             }
 
             specifiedHost.remove(FBUtilities.getBroadcastAddress());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index e3b4752..2810e2f 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3172,7 +3172,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
             parallelism = RepairParallelism.PARALLEL;
         }
 
-        RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList(), false);
+        RepairOption options = new RepairOption(parallelism, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList(), false, false);
         if (dataCenters != null)
         {
             options.getDataCenters().addAll(dataCenters);
@@ -3264,7 +3264,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                         "The repair will occur but without anti-compaction.");
         Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
 
-        RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange, true);
+        RepairOption options = new RepairOption(parallelism, false, !fullRepair, false, 1, repairingRange, true, false);
         if (dataCenters != null)
         {
             options.getDataCenters().addAll(dataCenters);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/src/java/org/apache/cassandra/tools/nodetool/Repair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/nodetool/Repair.java b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
index 02bfc5b..5383fa5 100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Repair.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Repair.java
@@ -81,6 +81,9 @@ public class Repair extends NodeToolCmd
     @Option(title = "trace_repair", name = {"-tr", "--trace"}, description = "Use -tr to trace the repair. Traces are logged to system_traces.events.")
     private boolean trace = false;
 
+    @Option(title = "pull_repair", name = {"-pl", "--pull"}, description = "Use --pull to perform a one way repair where data is only streamed from a remote node to this node.")
+    private boolean pullRepair = false;
+
     @Override
     public void execute(NodeProbe probe)
     {
@@ -108,6 +111,7 @@ public class Repair extends NodeToolCmd
             options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(numJobThreads));
             options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
             options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ","));
+            options.put(RepairOption.PULL_REPAIR_KEY, Boolean.toString(pullRepair));
             if (!startToken.isEmpty() || !endToken.isEmpty())
             {
                 options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
index 6aacae6..0fceaf4 100644
--- a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
+++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java
@@ -76,7 +76,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine here
         TreeResponse r1 = new TreeResponse(ep1, tree1);
         TreeResponse r2 = new TreeResponse(ep2, tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false);
         task.run();
 
         assertEquals(0, task.get().numberOfDifferences);
@@ -111,7 +111,7 @@ public class LocalSyncTaskTest extends SchemaLoader
         // note: we reuse the same endpoint which is bogus in theory but fine here
         TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1);
         TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2);
-        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE);
+        LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE, false);
         task.run();
 
         // ensure that the changed range was recorded

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
index d40982c..7b31c26 100644
--- a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java
@@ -54,7 +54,7 @@ public class RepairSessionTest
         IPartitioner p = Murmur3Partitioner.instance;
         Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)));
         Set<InetAddress> endpoints = Sets.newHashSet(remote);
-        RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1");
+        RepairSession session = new RepairSession(parentSessionId, sessionId, Arrays.asList(repairRange), "Keyspace1", RepairParallelism.SEQUENTIAL, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, false, "Standard1");
 
         // perform convict
         session.convict(remote, Double.MAX_VALUE);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b29736c2/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index a564cff..665a0b7 100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@ -38,7 +38,10 @@ import org.apache.cassandra.utils.FBUtilities;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.junit.matchers.JUnitMatchers.containsString;
 
 public class RepairOptionTest
 {
@@ -59,7 +62,7 @@ public class RepairOptionTest
         assertFalse(option.isPrimaryRange());
         assertFalse(option.isIncremental());
 
-        // parse everything
+        // parse everything except hosts (hosts cannot be combined with data centers)
         Map<String, String> options = new HashMap<>();
         options.put(RepairOption.PARALLELISM_KEY, "parallel");
         options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
@@ -67,7 +70,6 @@ public class RepairOptionTest
         options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30");
         options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
         options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3");
-        options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
 
         option = RepairOption.parse(options, partitioner);
         assertTrue(option.getParallelism() == RepairParallelism.PARALLEL);
@@ -92,6 +94,14 @@ public class RepairOptionTest
         expectedDCs.add("dc3");
         assertEquals(expectedDCs, option.getDataCenters());
 
+        // expect an error when parsing with hosts as well
+        options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
+        assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Cannot combine -dc and -hosts options");
+
+        // remove data centers to proceed with testing parsing hosts
+        options.remove(RepairOption.DATACENTERS_KEY);
+        option = RepairOption.parse(options, partitioner);
+
         Set<String> expectedHosts = new HashSet<>(3);
         expectedHosts.add("127.0.0.1");
         expectedHosts.add("127.0.0.2");
@@ -100,6 +110,25 @@ public class RepairOptionTest
     }
 
     @Test
+    public void testPullRepairParseOptions()
+    {
+        Map<String, String> options = new HashMap<>();
+
+        options.put(RepairOption.PULL_REPAIR_KEY, "true");
+        assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Pull repair can only be performed between two hosts");
+
+        options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
+        assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Pull repair can only be performed between two hosts");
+
+        options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2");
+        assertParseThrowsIllegalArgumentExceptionWithMessage(options, "Token ranges must be specified when performing pull repair");
+
+        options.put(RepairOption.RANGES_KEY, "0:10");
+        RepairOption option = RepairOption.parse(options, Murmur3Partitioner.instance);
+        assertTrue(option.isPullRepair());
+    }
+
+    @Test
     public void testIncrementalRepairWithSubrangesIsNotGlobal() throws Exception
     {
         RepairOption ro = RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, "true", RepairOption.RANGES_KEY, "42:42"),
@@ -109,4 +138,17 @@ public class RepairOptionTest
                 Murmur3Partitioner.instance);
         assertTrue(ro.isGlobal());
     }
+
+    private void assertParseThrowsIllegalArgumentExceptionWithMessage(Map<String, String> optionsToParse, String expectedErrorMessage)
+    {
+        try
+        {
+            RepairOption.parse(optionsToParse, Murmur3Partitioner.instance);
+            fail(String.format("Expected RepairOption.parse() to throw an IllegalArgumentException containing the message '%s'", expectedErrorMessage));
+        }
+        catch (IllegalArgumentException ex)
+        {
+            assertThat(ex.getMessage(), containsString(expectedErrorMessage));
+        }
+    }
 }