You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/02/04 14:07:14 UTC

[cassandra] 02/02: Restore monotonic read consistency guarantees

This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0f22dab1a015cb84d9857f940de5a256bfbee083
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Thu Jul 18 14:46:06 2019 +0100

    Restore monotonic read consistency guarantees
    
    patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-14740
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ConsistencyLevel.java  |   2 +-
 .../cassandra/locator/ReplicaCollection.java       |   5 +
 .../org/apache/cassandra/locator/ReplicaPlan.java  |  13 ++-
 .../org/apache/cassandra/locator/ReplicaPlans.java |  69 +++++++++++-
 .../cassandra/service/reads/DigestResolver.java    |   8 +-
 .../cassandra/service/reads/ResponseResolver.java  |   2 +-
 .../reads/repair/BlockingPartitionRepair.java      |  38 ++++---
 .../service/reads/repair/BlockingReadRepair.java   |  24 ++--
 .../service/reads/repair/NoopReadRepair.java       |   2 +-
 .../service/reads/repair/ReadOnlyReadRepair.java   |   2 +-
 .../cassandra/service/reads/repair/ReadRepair.java |   2 +-
 .../reads/repair/RowIteratorMergeListener.java     | 123 ++++++++++++++------
 .../distributed/impl/AbstractCluster.java          |  20 ++++
 .../distributed/test/DistributedTestBase.java      |   7 +-
 .../cassandra/distributed/test/ReadRepairTest.java | 125 +++++++++++++++++++++
 ...WritePathTest.java => SimpleReadWriteTest.java} |   4 +-
 .../service/reads/AbstractReadResponseTest.java    |  19 +++-
 .../cassandra/service/reads/DataResolverTest.java  |  55 ++++++---
 .../reads/repair/AbstractReadRepairTest.java       |  36 ++++++
 .../reads/repair/BlockingReadRepairTest.java       |  34 +++---
 .../repair/DiagEventsBlockingReadRepairTest.java   |  21 ++--
 .../reads/repair/ReadOnlyReadRepairTest.java       |   7 +-
 .../service/reads/repair/ReadRepairTest.java       |  30 +++--
 .../service/reads/repair/TestableReadRepair.java   |   2 +-
 25 files changed, 495 insertions(+), 156 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 75fae01..6efa148 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Restore monotonic read consistency guarantees for blocking read repair (CASSANDRA-14740)
  * Separate exceptions for CAS write timeout exceptions caused by contention and unkown result (CASSANDRA-15350)
  * Fix in-jvm dtest java 11 compatibility (CASSANDRA-15463)
  * Remove joda time dependency (CASSANDRA-15257)
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 4973915..e685618 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -101,7 +101,7 @@ public enum ConsistencyLevel
     public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace keyspace)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
-        ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size());
+        ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(((strategy.getDatacenters().size() + 1) * 4) / 3);
         for (String dc : strategy.getDatacenters())
             perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc));
         return perDc;
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
index d870316..ec671d5 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -125,6 +125,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
         public C build();
 
         /**
+         * @return an Immutable clone that assumes this Builder will be modified again
+         */
+        public C snapshot();
+
+        /**
          * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type
          * (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range).
          */
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 861c912..16af58a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -52,6 +52,8 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
     public abstract int blockFor();
 
     public E contacts() { return contacts; }
+
+    // TODO: should this semantically return true if we contain the endpoint, not the exact replica?
     public boolean contacts(Replica replica) { return contacts.contains(replica); }
     public Keyspace keyspace() { return keyspace; }
     public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
@@ -72,17 +74,12 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
 
         public E candidates() { return candidates; }
 
-        public E uncontactedCandidates()
-        {
-            return candidates().filter(r -> !contacts(r));
-        }
-
         public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate)
         {
             return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull();
         }
 
-        public Replica getReplicaFor(InetAddressAndPort endpoint)
+        public Replica lookup(InetAddressAndPort endpoint)
         {
             return candidates().byEndpoint().get(endpoint);
         }
@@ -151,6 +148,10 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
         public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
         /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */
         public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); }
+        public Replica lookup(InetAddressAndPort endpoint)
+        {
+            return liveAndDown().byEndpoint().get(endpoint);
+        }
 
         public String toString()
         {
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index a6fe53f..236706a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -320,6 +320,11 @@ public class ReplicaPlans
         return result;
     }
 
+    public static ReplicaPlan.ForTokenWrite forReadRepair(Token token, ReplicaPlan.ForRead<?> readPlan) throws UnavailableException
+    {
+        return forWrite(readPlan.keyspace, readPlan.consistencyLevel, token, writeReadRepair(readPlan));
+    }
+
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
     {
         return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
@@ -345,7 +350,7 @@ public class ReplicaPlans
 
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
     {
-        EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live);
+        EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
         assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending());
         return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
     }
@@ -353,7 +358,7 @@ public class ReplicaPlans
     public interface Selector
     {
         <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, L liveAndDown, L live);
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
     }
 
     /**
@@ -366,7 +371,7 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, L liveAndDown, L live)
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
         {
             return liveAndDown.all();
         }
@@ -385,7 +390,7 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, L liveAndDown, L live)
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
         {
             if (!any(liveAndDown.all(), Replica::isTransient))
                 return liveAndDown.all();
@@ -417,6 +422,62 @@ public class ReplicaPlans
     };
 
     /**
+     * TODO: Transient Replication C-14404/C-14665
+     * TODO: We employ this even when there is no monotonicity to guarantee,
+     *          e.g. in case of CL.TWO, CL.ONE with speculation, etc.
+     *
+     * Construct a read-repair write plan to provide monotonicity guarantees on any data we return as part of a read.
+     *
+     * Since this is not a regular write, this is just to guarantee future reads will read this data, we select only
+     * the minimal number of nodes to meet the consistency level, and prefer nodes we contacted on read to minimise
+     * data transfer.
+     */
+    public static Selector writeReadRepair(ReplicaPlan.ForRead<?> readPlan)
+    {
+        return new Selector()
+        {
+            @Override
+            public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+            E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+            {
+                assert !any(liveAndDown.all(), Replica::isTransient);
+
+                ReplicaCollection.Builder<E> contacts = live.all().newBuilder(live.all().size());
+                // add all live nodes we might write to that we have already contacted on read
+                contacts.addAll(filter(live.all(), r -> readPlan.contacts().endpoints().contains(r.endpoint())));
+
+                // finally, add sufficient nodes to achieve our consistency level
+                if (consistencyLevel != EACH_QUORUM)
+                {
+                    int add = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - contacts.size();
+                    if (add > 0)
+                    {
+                        for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+                        {
+                            contacts.add(replica);
+                            if (--add == 0)
+                                break;
+                        }
+                    }
+                }
+                else
+                {
+                    ObjectIntOpenHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending());
+                    addToCountPerDc(requiredPerDc, contacts.snapshot(), -1);
+                    IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                    for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+                    {
+                        String dc = snitch.getDatacenter(replica);
+                        if (requiredPerDc.addTo(dc, -1) >= 0)
+                            contacts.add(replica);
+                    }
+                }
+                return contacts.build();
+            }
+        };
+    }
+
+    /**
      * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
      * but for the paxos linearisation agreement.
      *
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index cf7ec31..dbb761b 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -54,7 +54,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
     public void preprocess(Message<ReadResponse> message)
     {
         super.preprocess(message);
-        Replica replica = replicaPlan().getReplicaFor(message.from());
+        Replica replica = replicaPlan().lookup(message.from());
         if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
             dataResponse = message;
     }
@@ -69,7 +69,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
     {
         return any(responses,
                 msg -> !msg.payload.isDigestResponse()
-                        && replicaPlan().getReplicaFor(msg.from()).isTransient());
+                        && replicaPlan().lookup(msg.from()).isTransient());
     }
 
     public PartitionIterator getData()
@@ -93,7 +93,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
             // Reconcile with transient replicas
             for (Message<ReadResponse> response : responses)
             {
-                Replica replica = replicaPlan().getReplicaFor(response.from());
+                Replica replica = replicaPlan().lookup(response.from());
                 if (replica.isTransient())
                     dataResolver.preprocess(response);
             }
@@ -115,7 +115,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
         // TODO: should also not calculate if only one full node
         for (Message<ReadResponse> message : snapshot)
         {
-            if (replicaPlan().getReplicaFor(message.from()).isTransient())
+            if (replicaPlan().lookup(message.from()).isTransient())
                 continue;
 
             ByteBuffer newDigest = message.payload.digest(command);
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 8e15c1a..6ae19ac 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -55,7 +55,7 @@ public abstract class ResponseResolver<E extends Endpoints<E>, P extends Replica
 
     public void preprocess(Message<ReadResponse> message)
     {
-        if (replicaPlan().getReplicaFor(message.from()).isTransient() &&
+        if (replicaPlan().lookup(message.from()).isTransient() &&
             message.payload.isDigestResponse())
             throw new IllegalArgumentException("Digest response received from transient replica");
 
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 01fd7f0..edcf14d 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
@@ -53,33 +54,33 @@ import org.apache.cassandra.tracing.Tracing;
 
 import static org.apache.cassandra.net.Verb.*;
 
-public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+public class BlockingPartitionRepair
         extends AbstractFuture<Object> implements RequestCallback<Object>
 {
     private final DecoratedKey key;
-    private final P replicaPlan;
+    private final ReplicaPlan.ForTokenWrite writePlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
     private final Predicate<InetAddressAndPort> shouldBlockOn;
 
     private volatile long mutationsSentTime;
 
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
     {
-        this(key, repairs, maxBlockFor, replicaPlan,
-                replicaPlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
+        this(key, repairs, writePlan,
+             writePlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
     }
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan, Predicate<InetAddressAndPort> shouldBlockOn)
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> shouldBlockOn)
     {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
-        this.replicaPlan = replicaPlan;
+        this.writePlan = writePlan;
         this.shouldBlockOn = shouldBlockOn;
 
+        int blockFor = writePlan.blockFor();
         // here we remove empty repair mutations from the block for total, since
         // we're not sending them mutations
-        int blockFor = maxBlockFor;
-        for (Replica participant: replicaPlan.contacts())
+        for (Replica participant : writePlan.contacts())
         {
             // remote dcs can sometimes get involved in dc-local reads. We want to repair
             // them if they do, but they shouldn't interfere with blocking the client read.
@@ -95,10 +96,15 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
         latch = new CountDownLatch(Math.max(blockFor, 0));
     }
 
+    int blockFor()
+    {
+        return writePlan.blockFor();
+    }
+
     @VisibleForTesting
-    long waitingOn()
+    int waitingOn()
     {
-        return latch.getCount();
+        return (int) latch.getCount();
     }
 
     @VisibleForTesting
@@ -106,7 +112,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
     {
         if (shouldBlockOn.test(from))
         {
-            pendingRepairs.remove(replicaPlan.getReplicaFor(from));
+            pendingRepairs.remove(writePlan.lookup(from));
             latch.countDown();
         }
     }
@@ -199,7 +205,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
         if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit))
             return;
 
-        E newCandidates = replicaPlan.uncontactedCandidates();
+        EndpointsForToken newCandidates = writePlan.liveUncontacted();
         if (newCandidates.isEmpty())
             return;
 
@@ -221,7 +227,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
 
             if (mutation == null)
             {
-                mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true);
+                mutation = BlockingReadRepairs.createRepairMutation(update, writePlan.consistencyLevel(), replica.endpoint(), true);
                 versionedMutations[versionIdx] = mutation;
             }
 
@@ -240,7 +246,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
 
     Keyspace getKeyspace()
     {
-        return replicaPlan.keyspace();
+        return writePlan.keyspace();
     }
 
     DecoratedKey getKey()
@@ -250,6 +256,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
 
     ConsistencyLevel getConsistency()
     {
-        return replicaPlan.consistencyLevel();
+        return writePlan.consistencyLevel();
     }
 }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 764765e..fdc8b50 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.tracing.Tracing;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
@@ -53,12 +52,10 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
 
     protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
-    private final int blockFor;
 
     BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         super(command, replicaPlan, queryStartNanoTime);
-        this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
     }
 
     public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
@@ -84,31 +81,34 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     @Override
     public void awaitWrites()
     {
-        boolean timedOut = false;
-        for (BlockingPartitionRepair repair: repairs)
+        BlockingPartitionRepair timedOut = null;
+        for (BlockingPartitionRepair repair : repairs)
         {
             if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS))
             {
-                timedOut = true;
+                timedOut = repair;
+                break;
             }
         }
-        if (timedOut)
+        if (timedOut != null)
         {
-            // We got all responses, but timed out while repairing
-            int blockFor = replicaPlan().blockFor();
+            // We got all responses, but timed out while repairing;
+            // pick one of the repairs to throw, as this is better than completely manufacturing the error message
+            int blockFor = timedOut.blockFor();
+            int received = Math.min(blockFor - timedOut.waitingOn(), blockFor - 1);
             if (Tracing.isTracing())
                 Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
             else
                 logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 
-            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true);
+            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, true);
         }
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
-        BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan);
+        BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(partitionKey, mutations, writePlan);
         blockingRepair.sendInitialRepairs();
         repairs.add(blockingRepair);
     }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 6aa6ece..2f82c22 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -76,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
 
     }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index 64bfec2..d9293fb 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -61,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
         throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
     }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 9441945..4747651 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -93,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
      * Repairs a partition _after_ receiving data responses. This method receives replica list, since
      * we will block repair only on the replicas that have responded.
      */
-    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan);
+    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan);
 }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 60e0d41..fc4c351 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -19,14 +19,17 @@
 package org.apache.cassandra.service.reads.repair;
 
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Map;
+import java.util.function.Consumer;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringBound;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.LivenessInfo;
@@ -45,8 +48,10 @@ import org.apache.cassandra.db.rows.RowDiffListener;
 import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.schema.ColumnMetadata;
 
 public class RowIteratorMergeListener<E extends Endpoints<E>>
@@ -57,10 +62,14 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
     private final boolean isReversed;
     private final ReadCommand command;
 
+    private final BitSet writeBackTo;
+    private final boolean buildFullDiff;
+    /** the repairs we will send to each source, suffixed by a complete repair of all differences, if {@link #buildFullDiff} */
     private final PartitionUpdate.Builder[] repairs;
     private final Row.Builder[] currentRows;
     private final RowDiffListener diffListener;
-    private final ReplicaPlan.ForRead<E> replicaPlan;
+    private final ReplicaPlan.ForRead<E> readPlan;
+    private final ReplicaPlan.ForTokenWrite writePlan;
 
     // The partition level deletion for the merge row.
     private DeletionTime partitionLevelDeletion;
@@ -73,17 +82,36 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
 
     private final ReadRepair readRepair;
 
-    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
+    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> readPlan, ReadCommand command, ReadRepair readRepair)
     {
         this.partitionKey = partitionKey;
         this.columns = columns;
         this.isReversed = isReversed;
-        this.replicaPlan = replicaPlan;
-        int size = replicaPlan.contacts().size();
-        repairs = new PartitionUpdate.Builder[size];
-        currentRows = new Row.Builder[size];
-        sourceDeletionTime = new DeletionTime[size];
-        markerToRepair = new ClusteringBound[size];
+        this.readPlan = readPlan;
+        this.writePlan = ReplicaPlans.forReadRepair(partitionKey.getToken(), readPlan);
+
+        int size = readPlan.contacts().size();
+        this.writeBackTo = new BitSet(size);
+        {
+            int i = 0;
+            for (Replica replica : readPlan.contacts())
+            {
+                if (writePlan.contacts().endpoints().contains(replica.endpoint()))
+                    writeBackTo.set(i);
+                ++i;
+            }
+        }
+        // If we are contacting any nodes we didn't read from, we are likely handling a range movement.
+        // In this case we need to send all differences to these nodes, as we do not (with present design) know which
+        // node they bootstrapped from, and so which data we need to duplicate.
+        // In reality, there will be situations where we are simply sending the same number of writes to different nodes
+        // and in this case we could probably avoid building a full difference, and only ensure each write makes it to
+        // some other node, but it is probably not worth special casing this scenario.
+        this.buildFullDiff = Iterables.any(writePlan.contacts().endpoints(), e -> !readPlan.contacts().endpoints().contains(e));
+        this.repairs = new PartitionUpdate.Builder[size + (buildFullDiff ? 1 : 0)];
+        this.currentRows = new Row.Builder[size];
+        this.sourceDeletionTime = new DeletionTime[size];
+        this.markerToRepair = new ClusteringBound[size];
         this.command = command;
         this.readRepair = readRepair;
 
@@ -128,13 +156,6 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
         };
     }
 
-    private PartitionUpdate.Builder update(int i)
-    {
-        if (repairs[i] == null)
-            repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
-        return repairs[i];
-    }
-
     /**
      * The partition level deletion with with which source {@code i} is currently repaired, or
      * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
@@ -156,13 +177,30 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
         return currentRows[i];
     }
 
+    @Inline
+    private void applyToPartition(int i, Consumer<PartitionUpdate.Builder> f)
+    {
+        if (writeBackTo.get(i))
+        {
+            if (repairs[i] == null)
+                repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+            f.accept(repairs[i]);
+        }
+        if (buildFullDiff)
+        {
+            if (repairs[repairs.length - 1] == null)
+                repairs[repairs.length - 1] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+            f.accept(repairs[repairs.length - 1]);
+        }
+    }
+
     public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
     {
         this.partitionLevelDeletion = mergedDeletion;
         for (int i = 0; i < versions.length; i++)
         {
             if (mergedDeletion.supersedes(versions[i]))
-                update(i).addPartitionDeletion(mergedDeletion);
+                applyToPartition(i, p -> p.addPartitionDeletion(mergedDeletion));
         }
     }
 
@@ -178,7 +216,10 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
         for (int i = 0; i < currentRows.length; i++)
         {
             if (currentRows[i] != null)
-                update(i).add(currentRows[i].build());
+            {
+                Row row = currentRows[i].build();
+                applyToPartition(i, p -> p.add(row));
+            }
         }
         Arrays.fill(currentRows, null);
     }
@@ -302,34 +343,44 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
     private void closeOpenMarker(int i, ClusteringBound close)
     {
         ClusteringBound open = markerToRepair[i];
-        update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+        RangeTombstone rt = new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion());
+        applyToPartition(i, p -> p.add(rt));
         markerToRepair[i] = null;
     }
 
     public void close()
     {
-        Map<Replica, Mutation> mutations = null;
-        Endpoints<?> sources = replicaPlan.contacts();
-        for (int i = 0; i < repairs.length; i++)
-        {
-            if (repairs[i] == null)
-                continue;
+        boolean hasRepairs = false;
+        for (int i = 0 ; !hasRepairs && i < repairs.length ; ++i)
+            hasRepairs = repairs[i] != null;
+        if (!hasRepairs)
+            return;
+
+        PartitionUpdate fullDiffRepair = null;
+        if (buildFullDiff && repairs[repairs.length - 1] != null)
+            fullDiffRepair = repairs[repairs.length - 1].build();
 
-            Replica source = sources.get(i);
+        Map<Replica, Mutation> mutations = Maps.newHashMapWithExpectedSize(writePlan.contacts().size());
+        ObjectIntOpenHashMap<InetAddressAndPort> sourceIds = new ObjectIntOpenHashMap<>(((repairs.length + 1) * 4) / 3);
+        for (int i = 0 ; i < readPlan.contacts().size() ; ++i)
+            sourceIds.put(readPlan.contacts().get(i).endpoint(), 1 + i);
 
-            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false);
+        for (Replica replica : writePlan.contacts())
+        {
+            PartitionUpdate update = null;
+            int i = -1 + sourceIds.get(replica.endpoint());
+            if (i < 0)
+                update = fullDiffRepair;
+            else if (repairs[i] != null)
+                update = repairs[i].build();
+
+            Mutation mutation = BlockingReadRepairs.createRepairMutation(update, readPlan.consistencyLevel(), replica.endpoint(), false);
             if (mutation == null)
                 continue;
 
-            if (mutations == null)
-                mutations = Maps.newHashMapWithExpectedSize(sources.size());
-
-            mutations.put(source, mutation);
+            mutations.put(replica, mutation);
         }
 
-        if (mutations != null)
-        {
-            readRepair.repairPartition(partitionKey, mutations, replicaPlan);
-        }
+        readRepair.repairPartition(partitionKey, mutations, writePlan);
     }
 }
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 68cc6a2..d487c14 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -766,5 +768,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
+    public List<Token> tokens()
+    {
+        return stream()
+               .map(i ->
+                    {
+                        try
+                        {
+                            IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance());
+                            return partitioner.getTokenFactory().fromString(i.config().getString("initial_token"));
+                        }
+                        catch (Throwable t)
+                        {
+                            throw new RuntimeException(t);
+                        }
+                    })
+               .collect(Collectors.toList());
+    }
+
 }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 64dee64..65224ea 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -78,7 +78,12 @@ public class DistributedTestBase
 
     protected static <C extends AbstractCluster<?>> C init(C cluster)
     {
-        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
+        return init(cluster, Math.min(3, cluster.size()));
+    }
+
+    protected static <C extends AbstractCluster<?>> C init(C cluster, int replicationFactor)
+    {
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
         return cluster;
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
new file mode 100644
index 0000000..a8c5fa1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.READ_REQ;
+
+public class ReadRepairTest extends DistributedTestBase
+{
+
+    @Test
+    public void readRepairTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                      ConsistencyLevel.ALL),
+                       row(1, 1, 1));
+
+            // Verify that data got repaired to the third node
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void failingReadRepairTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            for (int i = 1 ; i <= 2 ; ++i)
+                cluster.get(i).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            cluster.verbs(READ_REPAIR_REQ).to(3).drop();
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                      ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+
+            // Data was not repaired
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+        }
+    }
+
+    @Test
+    public void movingTokenReadRepairTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(4), 3))
+        {
+            List<Token> tokens = cluster.tokens();
+
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            int i = 0;
+            while (true)
+            {
+                Token t = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(i));
+                if (t.compareTo(tokens.get(2 - 1)) < 0 && t.compareTo(tokens.get(1 - 1)) > 0)
+                    break;
+                ++i;
+            }
+
+            // write only to #4
+            cluster.get(4).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1)", i);
+            // mark #2 as leaving in #4
+            cluster.get(4).acceptsOnInstance((InetAddressAndPort endpoint) -> {
+                StorageService.instance.getTokenMetadata().addLeavingEndpoint(endpoint);
+                PendingRangeCalculatorService.instance.update();
+                PendingRangeCalculatorService.instance.blockUntilFinished();
+            }).accept(cluster.get(2).broadcastAddressAndPort());
+
+            // prevent #4 from reading or writing to #3, so our QUORUM must contain #2 and #4
+            // since #1 is taking over the range, this means any read-repair must make it to #1 as well
+            cluster.filters().verbs(READ_REQ.ordinal()).from(4).to(3).drop();
+            cluster.filters().verbs(READ_REPAIR_REQ.ordinal()).from(4).to(3).drop();
+            assertRows(cluster.coordinator(4).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                      ConsistencyLevel.ALL, i),
+                       row(i, 1, 1));
+
+            // verify that #1 receives the write
+            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", i),
+                       row(i, 1, 1));
+        }
+    }
+
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
similarity index 99%
rename from test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 679a90f..4292598 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.util.concurrent.TimeUnit;
-
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,7 +36,7 @@ import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
 import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
 
-public class DistributedReadWritePathTest extends DistributedTestBase
+public class SimpleReadWriteTest extends DistributedTestBase
 {
     @BeforeClass
     public static void before()
diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
index 545731b..884baa1 100644
--- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -82,14 +82,18 @@ import static org.apache.cassandra.net.Verb.READ_REQ;
 public abstract class AbstractReadResponseTest
 {
     public static final String KEYSPACE1 = "DataResolverTest";
+    public static final String KEYSPACE3 = "DataResolverTest3";
     public static final String CF_STANDARD = "Standard1";
     public static final String CF_COLLECTION = "Collection1";
 
     public static Keyspace ks;
+    public static Keyspace ks3;
     public static ColumnFamilyStore cfs;
     public static ColumnFamilyStore cfs2;
+    public static ColumnFamilyStore cfs3;
     public static TableMetadata cfm;
     public static TableMetadata cfm2;
+    public static TableMetadata cfm3;
     public static ColumnMetadata m;
 
     public static DecoratedKey dk;
@@ -128,19 +132,32 @@ public abstract class AbstractReadResponseTest
                      .addRegularColumn("one", AsciiType.instance)
                      .addRegularColumn("two", AsciiType.instance);
 
+        TableMetadata.Builder builder3 =
+        TableMetadata.builder(KEYSPACE3, CF_STANDARD)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col1", AsciiType.instance)
+                     .addRegularColumn("c1", AsciiType.instance)
+                     .addRegularColumn("c2", AsciiType.instance)
+                     .addRegularColumn("one", AsciiType.instance)
+                     .addRegularColumn("two", AsciiType.instance);
+
         TableMetadata.Builder builder2 =
         TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
                      .addPartitionKeyColumn("k", ByteType.instance)
                      .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
 
         SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(2), builder1, builder2);
+        SchemaLoader.createKeyspace(KEYSPACE3, KeyspaceParams.simple(4), builder3);
 
         ks = Keyspace.open(KEYSPACE1);
         cfs = ks.getColumnFamilyStore(CF_STANDARD);
         cfm = cfs.metadata();
         cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
         cfm2 = cfs2.metadata();
+        ks3 = Keyspace.open(KEYSPACE3);
+        cfs3 = ks3.getColumnFamilyStore(CF_STANDARD);
+        cfm3 = cfs3.metadata();
         m = cfm2.getColumn(new ColumnIdentifier("m", false));
     }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 17c6e41..50ed09d 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -21,9 +21,17 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.UUID;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
@@ -59,6 +67,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
@@ -76,28 +85,42 @@ import static org.junit.Assert.assertTrue;
 
 public class DataResolverTest extends AbstractReadResponseTest
 {
-    public static final String KEYSPACE1 = "DataResolverTest";
-    public static final String CF_STANDARD = "Standard1";
-
     private ReadCommand command;
     private TestableReadRepair readRepair;
+    private Keyspace ks;
+    private ColumnFamilyStore cfs;
 
-    @Before
-    public void setup()
+    private EndpointsForRange makeReplicas(int num)
     {
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+
+        switch (num)
+        {
+            case 2:
+                ks = AbstractReadResponseTest.ks;
+                cfs = AbstractReadResponseTest.cfs;
+                break;
+            case 4:
+                ks = AbstractReadResponseTest.ks3;
+                cfs = AbstractReadResponseTest.cfs3;
+                break;
+            default:
+                throw new IllegalStateException("This test needs refactoring to cleanly support different replication factors");
+        }
+
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
         command.trackRepairedStatus();
         readRepair = new TestableReadRepair(command);
-    }
-
-    private static EndpointsForRange makeReplicas(int num)
-    {
+        Token token = Murmur3Partitioner.instance.getMinimumToken();
         EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
         for (int i = 0; i < num; i++)
         {
             try
             {
-                replicas.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) })));
+                InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) });
+                replicas.add(ReplicaUtils.full(endpoint));
+                StorageService.instance.getTokenMetadata().updateNormalToken(token = token.increaseSlightly(), endpoint);
+                Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
             }
             catch (UnknownHostException e)
             {
@@ -231,30 +254,30 @@ public class DataResolverTest extends AbstractReadResponseTest
 
         RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
         RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
-        PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+        PartitionUpdate update = new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
                                                                             .addRangeTombstone(tombstone2)
                                                                             .buildUpdate();
 
         InetAddressAndPort peer1 = replicas.get(0).endpoint();
-        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
                                                                                             .addRangeTombstone(tombstone2)
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer1, iter1));
         // not covered by any range tombstone
         InetAddressAndPort peer2 = replicas.get(1).endpoint();
-        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
+        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("0")
                                                                                             .add("c1", "v0")
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer2, iter2));
         // covered by a range tombstone
         InetAddressAndPort peer3 = replicas.get(2).endpoint();
-        UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
+        UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("10")
                                                                                             .add("c2", "v1")
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer3, iter3));
         // range covered by rt, but newer
         InetAddressAndPort peer4 = replicas.get(3).endpoint();
-        UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
+        UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm3, nowInSec, 2L, dk).clustering("3")
                                                                                             .add("one", "A")
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer4, iter4));
@@ -1294,7 +1317,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     private void assertRepairMetadata(Mutation mutation)
     {
         PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
-        assertEquals(update.metadata().keyspace, cfm.keyspace);
+        assertEquals(update.metadata().keyspace, ks.getName());
         assertEquals(update.metadata().name, cfm.name);
     }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 3d39732..ed4ef3f 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -1,14 +1,24 @@
 package org.apache.cassandra.service.reads.repair;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,6 +51,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -48,6 +59,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.locator.Replica.fullReplica;
@@ -216,6 +228,14 @@ public abstract  class AbstractReadRepairTest
 
         replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
 
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
+        Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1);
+        Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1);
+        Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1);
+
         // default test values
         key  = dk(5);
         cell1 = cell("v", "val1", now);
@@ -247,6 +267,22 @@ public abstract  class AbstractReadRepairTest
         return replicaPlan(ks, consistencyLevel, replicas, replicas);
     }
 
+    static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan)
+    {
+        return repairPlan(readPlan, readPlan.candidates());
+    }
+
+    static ReplicaPlan.ForTokenWrite repairPlan(EndpointsForRange liveAndDown, EndpointsForRange targets)
+    {
+        return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown);
+    }
+
+    static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown)
+    {
+        Token token = readPlan.range().left.getToken();
+        EndpointsForToken pending = EndpointsForToken.empty(token);
+        return ReplicaPlans.forWrite(ks, ConsistencyLevel.TWO, liveAndDown.forToken(token), pending, Predicates.alwaysTrue(), ReplicaPlans.writeReadRepair(readPlan));
+    }
     static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
     {
         return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 3cc1a63..e4b3a71 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -44,13 +44,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
 
 public class BlockingReadRepairTest extends AbstractReadRepairTest
 {
-    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-            extends BlockingPartitionRepair<E, P>
+    private static class InstrumentedReadRepairHandler
+            extends BlockingPartitionRepair
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan,
-                    e -> targets.contains(e));
+            super(Util.dk("not a real usable value"), repairs, writePlan, e -> targets.contains(e));
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -67,16 +66,15 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         configureClass(ReadRepairStrategy.BLOCKING);
     }
 
-    private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-    InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
     {
-        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
+        return new InstrumentedReadRepairHandler(repairs, writePlan);
     }
 
-    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
+    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs)
     {
         EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
+        return createRepairHandler(repairs, repairPlan(replicas, replicas));
     }
 
     private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
@@ -140,8 +138,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, repair1);
         repairs.put(replica2, repair2);
 
-        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaPlan);
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
 
         Assert.assertTrue(handler.mutationsSent.isEmpty());
 
@@ -176,7 +174,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, mutation(cell2));
         repairs.put(replica2, mutation(cell1));
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
         handler.sendInitialRepairs();
         handler.ack(target1);
         handler.ack(target2);
@@ -197,7 +195,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, mutation(cell2));
         repairs.put(replica2, mutation(cell1));
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
         handler.sendInitialRepairs();
 
         // we've already sent mutations to all candidates, so we shouldn't send any more
@@ -219,7 +217,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, repair1);
 
         // check that the correct initial mutations are sent out
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, repairPlan(replicas, EndpointsForRange.of(replica1, replica2)));
         handler.sendInitialRepairs();
         Assert.assertEquals(1, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1));
@@ -240,7 +238,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica3, mutation(cell3));
         Assert.assertEquals(3, repairs.size());
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
         handler.sendInitialRepairs();
 
         Assert.assertFalse(getCurrentRepairStatus(handler));
@@ -266,8 +264,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(remote1, mutation(cell1));
 
         EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2);
-        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
         handler.sendInitialRepairs();
         Assert.assertEquals(2, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index c15d7f4..7806a3f 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
 import com.google.common.collect.Lists;
+
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.After;
 import org.junit.Assert;
@@ -81,8 +82,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica2, repair2);
 
 
-        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, writePlan);
 
         Assert.assertTrue(handler.updatesByEp.isEmpty());
 
@@ -117,15 +118,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         return new DiagnosticBlockingRepairHandler(command, replicaPlan, queryStartNanoTime);
     }
 
-    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaPlan.ForRead<?> replicaPlan)
-    {
-        return new DiagnosticPartitionReadRepairHandler<>(key, repairs, maxBlockFor, replicaPlan);
-    }
-
-    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
+    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
     {
-        EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
+        return new DiagnosticPartitionReadRepairHandler(key, repairs, writePlan);
     }
 
     private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair
@@ -170,7 +165,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
     }
 
     private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-            extends BlockingPartitionRepair<E, P>
+            extends BlockingPartitionRepair
     {
         private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>();
 
@@ -180,9 +175,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
             return e -> candidates.contains(e);
         }
 
-        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(key, repairs, maxBlockFor, replicaPlan, isLocal());
+            super(key, repairs, writePlan, isLocal());
             DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent);
         }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index cc7fdf1..c0af493 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -92,8 +92,9 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
     @Test(expected = UnsupportedOperationException.class)
     public void repairPartitionFailure()
     {
-        ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
-        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan);
-        repair.repairPartition(null, Collections.emptyMap(), replicaPlan.get());
+        ReplicaPlan.SharedForRangeRead readPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, replicas);
+        InstrumentedReadRepair repair = createInstrumentedReadRepair(readPlan);
+        repair.repairPartition(null, Collections.emptyMap(), writePlan);
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 5ae9dd8..7458e9b 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -49,8 +49,6 @@ import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -72,12 +70,11 @@ public class ReadRepairTest
     static EndpointsForRange targets;
 
     private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-            extends BlockingPartitionRepair<E, P>
+            extends BlockingPartitionRepair
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan,
-                    e -> replicaPlan.consistencyLevel().isDatacenterLocal() && targets.endpoints().contains(e));
+            super(Util.dk("not a valid key"), repairs, writePlan, e -> targets.endpoints().contains(e));
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -161,10 +158,11 @@ public class ReadRepairTest
         return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, builder.build()));
     }
 
-    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets)
+    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, EndpointsForRange all, EndpointsForRange targets)
     {
-        ReplicaPlan.ForRangeRead replicaPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
-        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
+        ReplicaPlan.ForRangeRead readPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
+        ReplicaPlan.ForTokenWrite writePlan = AbstractReadRepairTest.repairPlan(readPlan);
+        return new InstrumentedReadRepairHandler(repairs, writePlan);
     }
 
     @Test
@@ -198,8 +196,7 @@ public class ReadRepairTest
         repairs.put(target1, repair1);
         repairs.put(target2, repair2);
 
-        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2,
-                                                                    targets, EndpointsForRange.of(target1, target2));
+        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2));
 
         Assert.assertTrue(handler.mutationsSent.isEmpty());
 
@@ -234,7 +231,7 @@ public class ReadRepairTest
         repairs.put(target2, mutation(cell1));
 
         EndpointsForRange replicas = EndpointsForRange.of(target1, target2);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, targets);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, targets);
         handler.sendInitialRepairs();
         handler.ack(target1.endpoint());
         handler.ack(target2.endpoint());
@@ -255,7 +252,7 @@ public class ReadRepairTest
         repairs.put(target1, mutation(cell2));
         repairs.put(target2, mutation(cell1));
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, EndpointsForRange.of(target1, target2),
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, EndpointsForRange.of(target1, target2),
                                                                     EndpointsForRange.of(target1, target2));
         handler.sendInitialRepairs();
 
@@ -278,7 +275,7 @@ public class ReadRepairTest
         repairs.put(target1, repair1);
 
         // check that the correct initial mutations are sent out
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, targets, EndpointsForRange.of(target1, target2));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2));
         handler.sendInitialRepairs();
         Assert.assertEquals(1, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
@@ -301,7 +298,7 @@ public class ReadRepairTest
         Assert.assertEquals(3, repairs.size());
 
         EndpointsForRange replicas = EndpointsForRange.of(target1, target2, target3);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, replicas);
         handler.sendInitialRepairs();
 
         Assert.assertFalse(getCurrentRepairStatus(handler));
@@ -311,7 +308,6 @@ public class ReadRepairTest
         // here we should stop blocking, even though we've sent 3 repairs
         handler.ack(target2.endpoint());
         Assert.assertTrue(getCurrentRepairStatus(handler));
-
     }
 
     /**
@@ -330,7 +326,7 @@ public class ReadRepairTest
         EndpointsForRange participants = EndpointsForRange.of(target1, target2, remote1, remote2);
         EndpointsForRange targets = EndpointsForRange.of(target1, target2);
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants, targets);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, participants, targets);
         handler.sendInitialRepairs();
         Assert.assertEquals(2, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 53964cb..84276d5 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
         for (Map.Entry<Replica, Mutation> entry: mutations.entrySet())
             sent.put(entry.getKey().endpoint(), entry.getValue());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org