You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sa...@apache.org on 2020/02/04 14:07:12 UTC

[cassandra] branch trunk updated (7c088b9 -> 0f22dab)

This is an automated email from the ASF dual-hosted git repository.

samt pushed a change to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git.


    from 7c088b9  Merge branch 'cassandra-3.11' into trunk
     new 5ec9e08  Add missing CHANGES.txt entry
     new 0f22dab  Restore monotonic read consistency guarantees

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGES.txt                                        |   2 +
 .../org/apache/cassandra/db/ConsistencyLevel.java  |   2 +-
 .../cassandra/locator/ReplicaCollection.java       |   5 +
 .../org/apache/cassandra/locator/ReplicaPlan.java  |  13 ++-
 .../org/apache/cassandra/locator/ReplicaPlans.java |  69 +++++++++++-
 .../cassandra/service/reads/DigestResolver.java    |   8 +-
 .../cassandra/service/reads/ResponseResolver.java  |   2 +-
 .../reads/repair/BlockingPartitionRepair.java      |  38 ++++---
 .../service/reads/repair/BlockingReadRepair.java   |  24 ++--
 .../service/reads/repair/NoopReadRepair.java       |   2 +-
 .../service/reads/repair/ReadOnlyReadRepair.java   |   2 +-
 .../cassandra/service/reads/repair/ReadRepair.java |   2 +-
 .../reads/repair/RowIteratorMergeListener.java     | 123 ++++++++++++++------
 .../distributed/impl/AbstractCluster.java          |  20 ++++
 .../distributed/test/DistributedTestBase.java      |   7 +-
 .../cassandra/distributed/test/ReadRepairTest.java | 125 +++++++++++++++++++++
 ...WritePathTest.java => SimpleReadWriteTest.java} |   4 +-
 .../service/reads/AbstractReadResponseTest.java    |  19 +++-
 .../cassandra/service/reads/DataResolverTest.java  |  55 ++++++---
 .../reads/repair/AbstractReadRepairTest.java       |  36 ++++++
 .../reads/repair/BlockingReadRepairTest.java       |  34 +++---
 .../repair/DiagEventsBlockingReadRepairTest.java   |  21 ++--
 .../reads/repair/ReadOnlyReadRepairTest.java       |   7 +-
 .../service/reads/repair/ReadRepairTest.java       |  30 +++--
 .../service/reads/repair/TestableReadRepair.java   |   2 +-
 25 files changed, 496 insertions(+), 156 deletions(-)
 create mode 100644 test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
 rename test/distributed/org/apache/cassandra/distributed/test/{DistributedReadWritePathTest.java => SimpleReadWriteTest.java} (99%)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 02/02: Restore monotonic read consistency guarantees

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 0f22dab1a015cb84d9857f940de5a256bfbee083
Author: Benedict Elliott Smith <be...@apache.org>
AuthorDate: Thu Jul 18 14:46:06 2019 +0100

    Restore monotonic read consistency guarantees
    
    patch by Benedict; reviewed by Sam Tunnicliffe for CASSANDRA-14740
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ConsistencyLevel.java  |   2 +-
 .../cassandra/locator/ReplicaCollection.java       |   5 +
 .../org/apache/cassandra/locator/ReplicaPlan.java  |  13 ++-
 .../org/apache/cassandra/locator/ReplicaPlans.java |  69 +++++++++++-
 .../cassandra/service/reads/DigestResolver.java    |   8 +-
 .../cassandra/service/reads/ResponseResolver.java  |   2 +-
 .../reads/repair/BlockingPartitionRepair.java      |  38 ++++---
 .../service/reads/repair/BlockingReadRepair.java   |  24 ++--
 .../service/reads/repair/NoopReadRepair.java       |   2 +-
 .../service/reads/repair/ReadOnlyReadRepair.java   |   2 +-
 .../cassandra/service/reads/repair/ReadRepair.java |   2 +-
 .../reads/repair/RowIteratorMergeListener.java     | 123 ++++++++++++++------
 .../distributed/impl/AbstractCluster.java          |  20 ++++
 .../distributed/test/DistributedTestBase.java      |   7 +-
 .../cassandra/distributed/test/ReadRepairTest.java | 125 +++++++++++++++++++++
 ...WritePathTest.java => SimpleReadWriteTest.java} |   4 +-
 .../service/reads/AbstractReadResponseTest.java    |  19 +++-
 .../cassandra/service/reads/DataResolverTest.java  |  55 ++++++---
 .../reads/repair/AbstractReadRepairTest.java       |  36 ++++++
 .../reads/repair/BlockingReadRepairTest.java       |  34 +++---
 .../repair/DiagEventsBlockingReadRepairTest.java   |  21 ++--
 .../reads/repair/ReadOnlyReadRepairTest.java       |   7 +-
 .../service/reads/repair/ReadRepairTest.java       |  30 +++--
 .../service/reads/repair/TestableReadRepair.java   |   2 +-
 25 files changed, 495 insertions(+), 156 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 75fae01..6efa148 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Restore monotonic read consistency guarantees for blocking read repair (CASSANDRA-14740)
  * Separate exceptions for CAS write timeout exceptions caused by contention and unkown result (CASSANDRA-15350)
  * Fix in-jvm dtest java 11 compatibility (CASSANDRA-15463)
  * Remove joda time dependency (CASSANDRA-15257)
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 4973915..e685618 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -101,7 +101,7 @@ public enum ConsistencyLevel
     public static ObjectIntOpenHashMap<String> eachQuorumForRead(Keyspace keyspace)
     {
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
-        ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(strategy.getDatacenters().size());
+        ObjectIntOpenHashMap<String> perDc = new ObjectIntOpenHashMap<>(((strategy.getDatacenters().size() + 1) * 4) / 3);
         for (String dc : strategy.getDatacenters())
             perDc.put(dc, ConsistencyLevel.localQuorumFor(keyspace, dc));
         return perDc;
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
index d870316..ec671d5 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -125,6 +125,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
         public C build();
 
         /**
+         * @return an Immutable clone that assumes this Builder will be modified again
+         */
+        public C snapshot();
+
+        /**
          * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type
          * (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range).
          */
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
index 861c912..16af58a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -52,6 +52,8 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
     public abstract int blockFor();
 
     public E contacts() { return contacts; }
+
+    // TODO: should this semantically return true if we contain the endpoint, not the exact replica?
     public boolean contacts(Replica replica) { return contacts.contains(replica); }
     public Keyspace keyspace() { return keyspace; }
     public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
@@ -72,17 +74,12 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
 
         public E candidates() { return candidates; }
 
-        public E uncontactedCandidates()
-        {
-            return candidates().filter(r -> !contacts(r));
-        }
-
         public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate)
         {
             return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull();
         }
 
-        public Replica getReplicaFor(InetAddressAndPort endpoint)
+        public Replica lookup(InetAddressAndPort endpoint)
         {
             return candidates().byEndpoint().get(endpoint);
         }
@@ -151,6 +148,10 @@ public abstract class ReplicaPlan<E extends Endpoints<E>>
         public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
         /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */
         public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); }
+        public Replica lookup(InetAddressAndPort endpoint)
+        {
+            return liveAndDown().byEndpoint().get(endpoint);
+        }
 
         public String toString()
         {
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
index a6fe53f..236706a 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -320,6 +320,11 @@ public class ReplicaPlans
         return result;
     }
 
+    public static ReplicaPlan.ForTokenWrite forReadRepair(Token token, ReplicaPlan.ForRead<?> readPlan) throws UnavailableException
+    {
+        return forWrite(readPlan.keyspace, readPlan.consistencyLevel, token, writeReadRepair(readPlan));
+    }
+
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
     {
         return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
@@ -345,7 +350,7 @@ public class ReplicaPlans
 
     public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
     {
-        EndpointsForToken contacts = selector.select(keyspace, liveAndDown, live);
+        EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
         assureSufficientLiveReplicasForWrite(keyspace, consistencyLevel, live.all(), liveAndDown.pending());
         return new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
     }
@@ -353,7 +358,7 @@ public class ReplicaPlans
     public interface Selector
     {
         <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, L liveAndDown, L live);
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
     }
 
     /**
@@ -366,7 +371,7 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, L liveAndDown, L live)
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
         {
             return liveAndDown.all();
         }
@@ -385,7 +390,7 @@ public class ReplicaPlans
     {
         @Override
         public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
-        E select(Keyspace keyspace, L liveAndDown, L live)
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
         {
             if (!any(liveAndDown.all(), Replica::isTransient))
                 return liveAndDown.all();
@@ -417,6 +422,62 @@ public class ReplicaPlans
     };
 
     /**
+     * TODO: Transient Replication C-14404/C-14665
+     * TODO: We employ this even when there is no monotonicity to guarantee,
+     *          e.g. in case of CL.TWO, CL.ONE with speculation, etc.
+     *
+     * Construct a read-repair write plan to provide monotonicity guarantees on any data we return as part of a read.
+     *
+     * Since this is not a regular write, this is just to guarantee future reads will read this data, we select only
+     * the minimal number of nodes to meet the consistency level, and prefer nodes we contacted on read to minimise
+     * data transfer.
+     */
+    public static Selector writeReadRepair(ReplicaPlan.ForRead<?> readPlan)
+    {
+        return new Selector()
+        {
+            @Override
+            public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+            E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+            {
+                assert !any(liveAndDown.all(), Replica::isTransient);
+
+                ReplicaCollection.Builder<E> contacts = live.all().newBuilder(live.all().size());
+                // add all live nodes we might write to that we have already contacted on read
+                contacts.addAll(filter(live.all(), r -> readPlan.contacts().endpoints().contains(r.endpoint())));
+
+                // finally, add sufficient nodes to achieve our consistency level
+                if (consistencyLevel != EACH_QUORUM)
+                {
+                    int add = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - contacts.size();
+                    if (add > 0)
+                    {
+                        for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+                        {
+                            contacts.add(replica);
+                            if (--add == 0)
+                                break;
+                        }
+                    }
+                }
+                else
+                {
+                    ObjectIntOpenHashMap<String> requiredPerDc = eachQuorumForWrite(keyspace, liveAndDown.pending());
+                    addToCountPerDc(requiredPerDc, contacts.snapshot(), -1);
+                    IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+                    for (Replica replica : filter(live.all(), r -> !contacts.contains(r)))
+                    {
+                        String dc = snitch.getDatacenter(replica);
+                        if (requiredPerDc.addTo(dc, -1) >= 0)
+                            contacts.add(replica);
+                    }
+                }
+                return contacts.build();
+            }
+        };
+    }
+
+    /**
      * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
      * but for the paxos linearisation agreement.
      *
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index cf7ec31..dbb761b 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -54,7 +54,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
     public void preprocess(Message<ReadResponse> message)
     {
         super.preprocess(message);
-        Replica replica = replicaPlan().getReplicaFor(message.from());
+        Replica replica = replicaPlan().lookup(message.from());
         if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
             dataResponse = message;
     }
@@ -69,7 +69,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
     {
         return any(responses,
                 msg -> !msg.payload.isDigestResponse()
-                        && replicaPlan().getReplicaFor(msg.from()).isTransient());
+                        && replicaPlan().lookup(msg.from()).isTransient());
     }
 
     public PartitionIterator getData()
@@ -93,7 +93,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
             // Reconcile with transient replicas
             for (Message<ReadResponse> response : responses)
             {
-                Replica replica = replicaPlan().getReplicaFor(response.from());
+                Replica replica = replicaPlan().lookup(response.from());
                 if (replica.isTransient())
                     dataResolver.preprocess(response);
             }
@@ -115,7 +115,7 @@ public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
         // TODO: should also not calculate if only one full node
         for (Message<ReadResponse> message : snapshot)
         {
-            if (replicaPlan().getReplicaFor(message.from()).isTransient())
+            if (replicaPlan().lookup(message.from()).isTransient())
                 continue;
 
             ByteBuffer newDigest = message.payload.digest(command);
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 8e15c1a..6ae19ac 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -55,7 +55,7 @@ public abstract class ResponseResolver<E extends Endpoints<E>, P extends Replica
 
     public void preprocess(Message<ReadResponse> message)
     {
-        if (replicaPlan().getReplicaFor(message.from()).isTransient() &&
+        if (replicaPlan().lookup(message.from()).isTransient() &&
             message.payload.isDigestResponse())
             throw new IllegalArgumentException("Digest response received from transient replica");
 
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 01fd7f0..edcf14d 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
@@ -53,33 +54,33 @@ import org.apache.cassandra.tracing.Tracing;
 
 import static org.apache.cassandra.net.Verb.*;
 
-public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+public class BlockingPartitionRepair
         extends AbstractFuture<Object> implements RequestCallback<Object>
 {
     private final DecoratedKey key;
-    private final P replicaPlan;
+    private final ReplicaPlan.ForTokenWrite writePlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
     private final Predicate<InetAddressAndPort> shouldBlockOn;
 
     private volatile long mutationsSentTime;
 
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
     {
-        this(key, repairs, maxBlockFor, replicaPlan,
-                replicaPlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
+        this(key, repairs, writePlan,
+             writePlan.consistencyLevel().isDatacenterLocal() ? InOurDcTester.endpoints() : Predicates.alwaysTrue());
     }
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan, Predicate<InetAddressAndPort> shouldBlockOn)
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan, Predicate<InetAddressAndPort> shouldBlockOn)
     {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
-        this.replicaPlan = replicaPlan;
+        this.writePlan = writePlan;
         this.shouldBlockOn = shouldBlockOn;
 
+        int blockFor = writePlan.blockFor();
         // here we remove empty repair mutations from the block for total, since
         // we're not sending them mutations
-        int blockFor = maxBlockFor;
-        for (Replica participant: replicaPlan.contacts())
+        for (Replica participant : writePlan.contacts())
         {
             // remote dcs can sometimes get involved in dc-local reads. We want to repair
             // them if they do, but they shouldn't interfere with blocking the client read.
@@ -95,10 +96,15 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
         latch = new CountDownLatch(Math.max(blockFor, 0));
     }
 
+    int blockFor()
+    {
+        return writePlan.blockFor();
+    }
+
     @VisibleForTesting
-    long waitingOn()
+    int waitingOn()
     {
-        return latch.getCount();
+        return (int) latch.getCount();
     }
 
     @VisibleForTesting
@@ -106,7 +112,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
     {
         if (shouldBlockOn.test(from))
         {
-            pendingRepairs.remove(replicaPlan.getReplicaFor(from));
+            pendingRepairs.remove(writePlan.lookup(from));
             latch.countDown();
         }
     }
@@ -199,7 +205,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
         if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit))
             return;
 
-        E newCandidates = replicaPlan.uncontactedCandidates();
+        EndpointsForToken newCandidates = writePlan.liveUncontacted();
         if (newCandidates.isEmpty())
             return;
 
@@ -221,7 +227,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
 
             if (mutation == null)
             {
-                mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true);
+                mutation = BlockingReadRepairs.createRepairMutation(update, writePlan.consistencyLevel(), replica.endpoint(), true);
                 versionedMutations[versionIdx] = mutation;
             }
 
@@ -240,7 +246,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
 
     Keyspace getKeyspace()
     {
-        return replicaPlan.keyspace();
+        return writePlan.keyspace();
     }
 
     DecoratedKey getKey()
@@ -250,6 +256,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPl
 
     ConsistencyLevel getConsistency()
     {
-        return replicaPlan.consistencyLevel();
+        return writePlan.consistencyLevel();
     }
 }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 764765e..fdc8b50 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -39,7 +39,6 @@ import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.tracing.Tracing;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 /**
@@ -53,12 +52,10 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
 
     protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
-    private final int blockFor;
 
     BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         super(command, replicaPlan, queryStartNanoTime);
-        this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
     }
 
     public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
@@ -84,31 +81,34 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     @Override
     public void awaitWrites()
     {
-        boolean timedOut = false;
-        for (BlockingPartitionRepair repair: repairs)
+        BlockingPartitionRepair timedOut = null;
+        for (BlockingPartitionRepair repair : repairs)
         {
             if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS))
             {
-                timedOut = true;
+                timedOut = repair;
+                break;
             }
         }
-        if (timedOut)
+        if (timedOut != null)
         {
-            // We got all responses, but timed out while repairing
-            int blockFor = replicaPlan().blockFor();
+            // We got all responses, but timed out while repairing;
+            // pick one of the repairs to throw, as this is better than completely manufacturing the error message
+            int blockFor = timedOut.blockFor();
+            int received = Math.min(blockFor - timedOut.waitingOn(), blockFor - 1);
             if (Tracing.isTracing())
                 Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
             else
                 logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 
-            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true);
+            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, true);
         }
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
-        BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan);
+        BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(partitionKey, mutations, writePlan);
         blockingRepair.sendInitialRepairs();
         repairs.add(blockingRepair);
     }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 6aa6ece..2f82c22 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -76,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
 
     }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index 64bfec2..d9293fb 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -61,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
         throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
     }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 9441945..4747651 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -93,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea
      * Repairs a partition _after_ receiving data responses. This method receives replica list, since
      * we will block repair only on the replicas that have responded.
      */
-    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan);
+    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan);
 }
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 60e0d41..fc4c351 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -19,14 +19,17 @@
 package org.apache.cassandra.service.reads.repair;
 
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Map;
+import java.util.function.Consumer;
 
-import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
 import com.google.common.collect.Maps;
 
+import com.carrotsearch.hppc.ObjectIntOpenHashMap;
+import net.nicoulaj.compilecommand.annotations.Inline;
 import org.apache.cassandra.db.Clustering;
 import org.apache.cassandra.db.ClusteringBound;
-import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.DeletionTime;
 import org.apache.cassandra.db.LivenessInfo;
@@ -45,8 +48,10 @@ import org.apache.cassandra.db.rows.RowDiffListener;
 import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.schema.ColumnMetadata;
 
 public class RowIteratorMergeListener<E extends Endpoints<E>>
@@ -57,10 +62,14 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
     private final boolean isReversed;
     private final ReadCommand command;
 
+    private final BitSet writeBackTo;
+    private final boolean buildFullDiff;
+    /** the repairs we will send to each source, suffixed by a complete repair of all differences, if {@link #buildFullDiff} */
     private final PartitionUpdate.Builder[] repairs;
     private final Row.Builder[] currentRows;
     private final RowDiffListener diffListener;
-    private final ReplicaPlan.ForRead<E> replicaPlan;
+    private final ReplicaPlan.ForRead<E> readPlan;
+    private final ReplicaPlan.ForTokenWrite writePlan;
 
     // The partition level deletion for the merge row.
     private DeletionTime partitionLevelDeletion;
@@ -73,17 +82,36 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
 
     private final ReadRepair readRepair;
 
-    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
+    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> readPlan, ReadCommand command, ReadRepair readRepair)
     {
         this.partitionKey = partitionKey;
         this.columns = columns;
         this.isReversed = isReversed;
-        this.replicaPlan = replicaPlan;
-        int size = replicaPlan.contacts().size();
-        repairs = new PartitionUpdate.Builder[size];
-        currentRows = new Row.Builder[size];
-        sourceDeletionTime = new DeletionTime[size];
-        markerToRepair = new ClusteringBound[size];
+        this.readPlan = readPlan;
+        this.writePlan = ReplicaPlans.forReadRepair(partitionKey.getToken(), readPlan);
+
+        int size = readPlan.contacts().size();
+        this.writeBackTo = new BitSet(size);
+        {
+            int i = 0;
+            for (Replica replica : readPlan.contacts())
+            {
+                if (writePlan.contacts().endpoints().contains(replica.endpoint()))
+                    writeBackTo.set(i);
+                ++i;
+            }
+        }
+        // If we are contacting any nodes we didn't read from, we are likely handling a range movement.
+        // In this case we need to send all differences to these nodes, as we do not (with present design) know which
+        // node they bootstrapped from, and so which data we need to duplicate.
+        // In reality, there will be situations where we are simply sending the same number of writes to different nodes
+        // and in this case we could probably avoid building a full difference, and only ensure each write makes it to
+        // some other node, but it is probably not worth special casing this scenario.
+        this.buildFullDiff = Iterables.any(writePlan.contacts().endpoints(), e -> !readPlan.contacts().endpoints().contains(e));
+        this.repairs = new PartitionUpdate.Builder[size + (buildFullDiff ? 1 : 0)];
+        this.currentRows = new Row.Builder[size];
+        this.sourceDeletionTime = new DeletionTime[size];
+        this.markerToRepair = new ClusteringBound[size];
         this.command = command;
         this.readRepair = readRepair;
 
@@ -128,13 +156,6 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
         };
     }
 
-    private PartitionUpdate.Builder update(int i)
-    {
-        if (repairs[i] == null)
-            repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
-        return repairs[i];
-    }
-
     /**
      * The partition level deletion with with which source {@code i} is currently repaired, or
      * {@code DeletionTime.LIVE} if the source is not repaired on the partition level deletion (meaning it was
@@ -156,13 +177,30 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
         return currentRows[i];
     }
 
+    @Inline
+    private void applyToPartition(int i, Consumer<PartitionUpdate.Builder> f)
+    {
+        if (writeBackTo.get(i))
+        {
+            if (repairs[i] == null)
+                repairs[i] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+            f.accept(repairs[i]);
+        }
+        if (buildFullDiff)
+        {
+            if (repairs[repairs.length - 1] == null)
+                repairs[repairs.length - 1] = new PartitionUpdate.Builder(command.metadata(), partitionKey, columns, 1);
+            f.accept(repairs[repairs.length - 1]);
+        }
+    }
+
     public void onMergedPartitionLevelDeletion(DeletionTime mergedDeletion, DeletionTime[] versions)
     {
         this.partitionLevelDeletion = mergedDeletion;
         for (int i = 0; i < versions.length; i++)
         {
             if (mergedDeletion.supersedes(versions[i]))
-                update(i).addPartitionDeletion(mergedDeletion);
+                applyToPartition(i, p -> p.addPartitionDeletion(mergedDeletion));
         }
     }
 
@@ -178,7 +216,10 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
         for (int i = 0; i < currentRows.length; i++)
         {
             if (currentRows[i] != null)
-                update(i).add(currentRows[i].build());
+            {
+                Row row = currentRows[i].build();
+                applyToPartition(i, p -> p.add(row));
+            }
         }
         Arrays.fill(currentRows, null);
     }
@@ -302,34 +343,44 @@ public class RowIteratorMergeListener<E extends Endpoints<E>>
     private void closeOpenMarker(int i, ClusteringBound close)
     {
         ClusteringBound open = markerToRepair[i];
-        update(i).add(new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion()));
+        RangeTombstone rt = new RangeTombstone(Slice.make(isReversed ? close : open, isReversed ? open : close), currentDeletion());
+        applyToPartition(i, p -> p.add(rt));
         markerToRepair[i] = null;
     }
 
     public void close()
     {
-        Map<Replica, Mutation> mutations = null;
-        Endpoints<?> sources = replicaPlan.contacts();
-        for (int i = 0; i < repairs.length; i++)
-        {
-            if (repairs[i] == null)
-                continue;
+        boolean hasRepairs = false;
+        for (int i = 0 ; !hasRepairs && i < repairs.length ; ++i)
+            hasRepairs = repairs[i] != null;
+        if (!hasRepairs)
+            return;
+
+        PartitionUpdate fullDiffRepair = null;
+        if (buildFullDiff && repairs[repairs.length - 1] != null)
+            fullDiffRepair = repairs[repairs.length - 1].build();
 
-            Replica source = sources.get(i);
+        Map<Replica, Mutation> mutations = Maps.newHashMapWithExpectedSize(writePlan.contacts().size());
+        ObjectIntOpenHashMap<InetAddressAndPort> sourceIds = new ObjectIntOpenHashMap<>(((repairs.length + 1) * 4) / 3);
+        for (int i = 0 ; i < readPlan.contacts().size() ; ++i)
+            sourceIds.put(readPlan.contacts().get(i).endpoint(), 1 + i);
 
-            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false);
+        for (Replica replica : writePlan.contacts())
+        {
+            PartitionUpdate update = null;
+            int i = -1 + sourceIds.get(replica.endpoint());
+            if (i < 0)
+                update = fullDiffRepair;
+            else if (repairs[i] != null)
+                update = repairs[i].build();
+
+            Mutation mutation = BlockingReadRepairs.createRepairMutation(update, readPlan.consistencyLevel(), replica.endpoint(), false);
             if (mutation == null)
                 continue;
 
-            if (mutations == null)
-                mutations = Maps.newHashMapWithExpectedSize(sources.size());
-
-            mutations.put(source, mutation);
+            mutations.put(replica, mutation);
         }
 
-        if (mutations != null)
-        {
-            readRepair.repairPartition(partitionKey, mutations, replicaPlan);
-        }
+        readRepair.repairPartition(partitionKey, mutations, writePlan);
     }
 }
\ No newline at end of file
diff --git a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
index 68cc6a2..d487c14 100644
--- a/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
+++ b/test/distributed/org/apache/cassandra/distributed/impl/AbstractCluster.java
@@ -47,6 +47,8 @@ import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.distributed.api.ICoordinator;
 import org.apache.cassandra.distributed.api.IInstance;
 import org.apache.cassandra.distributed.api.IInstanceConfig;
@@ -766,5 +768,23 @@ public abstract class AbstractCluster<I extends IInstance> implements ICluster,
         }
     }
 
+    public List<Token> tokens()
+    {
+        return stream()
+               .map(i ->
+                    {
+                        try
+                        {
+                            IPartitioner partitioner = ((IPartitioner)Class.forName(i.config().getString("partitioner")).newInstance());
+                            return partitioner.getTokenFactory().fromString(i.config().getString("initial_token"));
+                        }
+                        catch (Throwable t)
+                        {
+                            throw new RuntimeException(t);
+                        }
+                    })
+               .collect(Collectors.toList());
+    }
+
 }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
index 64dee64..65224ea 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/DistributedTestBase.java
@@ -78,7 +78,12 @@ public class DistributedTestBase
 
     protected static <C extends AbstractCluster<?>> C init(C cluster)
     {
-        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + cluster.size() + "};");
+        return init(cluster, Math.min(3, cluster.size()));
+    }
+
+    protected static <C extends AbstractCluster<?>> C init(C cluster, int replicationFactor)
+    {
+        cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + replicationFactor + "};");
         return cluster;
     }
 
diff --git a/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
new file mode 100644
index 0000000..a8c5fa1
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/ReadRepairTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.service.StorageService;
+
+import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
+import static org.apache.cassandra.net.Verb.READ_REQ;
+
+public class ReadRepairTest extends DistributedTestBase
+{
+
+    @Test
+    public void readRepairTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                      ConsistencyLevel.ALL),
+                       row(1, 1, 1));
+
+            // Verify that data got repaired to the third node
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"),
+                       row(1, 1, 1));
+        }
+    }
+
+    @Test
+    public void failingReadRepairTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(3)))
+        {
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            for (int i = 1 ; i <= 2 ; ++i)
+                cluster.get(i).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (1, 1, 1)");
+
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+
+            cluster.verbs(READ_REPAIR_REQ).to(3).drop();
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1",
+                                                      ConsistencyLevel.QUORUM),
+                       row(1, 1, 1));
+
+            // Data was not repaired
+            assertRows(cluster.get(3).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1"));
+        }
+    }
+
+    @Test
+    public void movingTokenReadRepairTest() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.create(4), 3))
+        {
+            List<Token> tokens = cluster.tokens();
+
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck)) WITH read_repair='blocking'");
+
+            int i = 0;
+            while (true)
+            {
+                Token t = Murmur3Partitioner.instance.getToken(Int32Type.instance.decompose(i));
+                if (t.compareTo(tokens.get(2 - 1)) < 0 && t.compareTo(tokens.get(1 - 1)) > 0)
+                    break;
+                ++i;
+            }
+
+            // write only to #4
+            cluster.get(4).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, 1, 1)", i);
+            // mark #2 as leaving in #4
+            cluster.get(4).acceptsOnInstance((InetAddressAndPort endpoint) -> {
+                StorageService.instance.getTokenMetadata().addLeavingEndpoint(endpoint);
+                PendingRangeCalculatorService.instance.update();
+                PendingRangeCalculatorService.instance.blockUntilFinished();
+            }).accept(cluster.get(2).broadcastAddressAndPort());
+
+            // prevent #4 from reading or writing to #3, so our QUORUM must contain #2 and #4
+            // since #1 is taking over the range, this means any read-repair must make it to #1 as well
+            cluster.filters().verbs(READ_REQ.ordinal()).from(4).to(3).drop();
+            cluster.filters().verbs(READ_REPAIR_REQ.ordinal()).from(4).to(3).drop();
+            assertRows(cluster.coordinator(4).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?",
+                                                      ConsistencyLevel.ALL, i),
+                       row(i, 1, 1));
+
+            // verify that #1 receives the write
+            assertRows(cluster.get(1).executeInternal("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = ?", i),
+                       row(i, 1, 1));
+        }
+    }
+
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
similarity index 99%
rename from test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
rename to test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 679a90f..4292598 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/DistributedReadWritePathTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.util.concurrent.TimeUnit;
-
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -38,7 +36,7 @@ import static org.junit.Assert.assertEquals;
 import static org.apache.cassandra.net.Verb.READ_REPAIR_REQ;
 import static org.apache.cassandra.net.OutboundConnections.LARGE_MESSAGE_THRESHOLD;
 
-public class DistributedReadWritePathTest extends DistributedTestBase
+public class SimpleReadWriteTest extends DistributedTestBase
 {
     @BeforeClass
     public static void before()
diff --git a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
index 545731b..884baa1 100644
--- a/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/AbstractReadResponseTest.java
@@ -82,14 +82,18 @@ import static org.apache.cassandra.net.Verb.READ_REQ;
 public abstract class AbstractReadResponseTest
 {
     public static final String KEYSPACE1 = "DataResolverTest";
+    public static final String KEYSPACE3 = "DataResolverTest3";
     public static final String CF_STANDARD = "Standard1";
     public static final String CF_COLLECTION = "Collection1";
 
     public static Keyspace ks;
+    public static Keyspace ks3;
     public static ColumnFamilyStore cfs;
     public static ColumnFamilyStore cfs2;
+    public static ColumnFamilyStore cfs3;
     public static TableMetadata cfm;
     public static TableMetadata cfm2;
+    public static TableMetadata cfm3;
     public static ColumnMetadata m;
 
     public static DecoratedKey dk;
@@ -128,19 +132,32 @@ public abstract class AbstractReadResponseTest
                      .addRegularColumn("one", AsciiType.instance)
                      .addRegularColumn("two", AsciiType.instance);
 
+        TableMetadata.Builder builder3 =
+        TableMetadata.builder(KEYSPACE3, CF_STANDARD)
+                     .addPartitionKeyColumn("key", BytesType.instance)
+                     .addClusteringColumn("col1", AsciiType.instance)
+                     .addRegularColumn("c1", AsciiType.instance)
+                     .addRegularColumn("c2", AsciiType.instance)
+                     .addRegularColumn("one", AsciiType.instance)
+                     .addRegularColumn("two", AsciiType.instance);
+
         TableMetadata.Builder builder2 =
         TableMetadata.builder(KEYSPACE1, CF_COLLECTION)
                      .addPartitionKeyColumn("k", ByteType.instance)
                      .addRegularColumn("m", MapType.getInstance(IntegerType.instance, IntegerType.instance, true));
 
         SchemaLoader.prepareServer();
-        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(1), builder1, builder2);
+        SchemaLoader.createKeyspace(KEYSPACE1, KeyspaceParams.simple(2), builder1, builder2);
+        SchemaLoader.createKeyspace(KEYSPACE3, KeyspaceParams.simple(4), builder3);
 
         ks = Keyspace.open(KEYSPACE1);
         cfs = ks.getColumnFamilyStore(CF_STANDARD);
         cfm = cfs.metadata();
         cfs2 = ks.getColumnFamilyStore(CF_COLLECTION);
         cfm2 = cfs2.metadata();
+        ks3 = Keyspace.open(KEYSPACE3);
+        cfs3 = ks3.getColumnFamilyStore(CF_STANDARD);
+        cfm3 = cfs3.metadata();
         m = cfm2.getColumn(new ColumnIdentifier("m", false));
     }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 17c6e41..50ed09d 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -21,9 +21,17 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.UUID;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
@@ -59,6 +67,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
 import org.apache.cassandra.service.reads.repair.RepairedDataTracker;
 import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
@@ -76,28 +85,42 @@ import static org.junit.Assert.assertTrue;
 
 public class DataResolverTest extends AbstractReadResponseTest
 {
-    public static final String KEYSPACE1 = "DataResolverTest";
-    public static final String CF_STANDARD = "Standard1";
-
     private ReadCommand command;
     private TestableReadRepair readRepair;
+    private Keyspace ks;
+    private ColumnFamilyStore cfs;
 
-    @Before
-    public void setup()
+    private EndpointsForRange makeReplicas(int num)
     {
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+
+        switch (num)
+        {
+            case 2:
+                ks = AbstractReadResponseTest.ks;
+                cfs = AbstractReadResponseTest.cfs;
+                break;
+            case 4:
+                ks = AbstractReadResponseTest.ks3;
+                cfs = AbstractReadResponseTest.cfs3;
+                break;
+            default:
+                throw new IllegalStateException("This test needs refactoring to cleanly support different replication factors");
+        }
+
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
         command.trackRepairedStatus();
         readRepair = new TestableReadRepair(command);
-    }
-
-    private static EndpointsForRange makeReplicas(int num)
-    {
+        Token token = Murmur3Partitioner.instance.getMinimumToken();
         EndpointsForRange.Builder replicas = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, num);
         for (int i = 0; i < num; i++)
         {
             try
             {
-                replicas.add(ReplicaUtils.full(InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) })));
+                InetAddressAndPort endpoint = InetAddressAndPort.getByAddress(new byte[]{ 127, 0, 0, (byte) (i + 1) });
+                replicas.add(ReplicaUtils.full(endpoint));
+                StorageService.instance.getTokenMetadata().updateNormalToken(token = token.increaseSlightly(), endpoint);
+                Gossiper.instance.initializeNodeUnsafe(endpoint, UUID.randomUUID(), 1);
             }
             catch (UnknownHostException e)
             {
@@ -231,30 +254,30 @@ public class DataResolverTest extends AbstractReadResponseTest
 
         RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
         RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
-        PartitionUpdate update = new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+        PartitionUpdate update = new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
                                                                             .addRangeTombstone(tombstone2)
                                                                             .buildUpdate();
 
         InetAddressAndPort peer1 = replicas.get(0).endpoint();
-        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
+        UnfilteredPartitionIterator iter1 = iter(new RowUpdateBuilder(cfm3, nowInSec, 1L, dk).addRangeTombstone(tombstone1)
                                                                                             .addRangeTombstone(tombstone2)
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer1, iter1));
         // not covered by any range tombstone
         InetAddressAndPort peer2 = replicas.get(1).endpoint();
-        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("0")
+        UnfilteredPartitionIterator iter2 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("0")
                                                                                             .add("c1", "v0")
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer2, iter2));
         // covered by a range tombstone
         InetAddressAndPort peer3 = replicas.get(2).endpoint();
-        UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("10")
+        UnfilteredPartitionIterator iter3 = iter(new RowUpdateBuilder(cfm3, nowInSec, 0L, dk).clustering("10")
                                                                                             .add("c2", "v1")
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer3, iter3));
         // range covered by rt, but newer
         InetAddressAndPort peer4 = replicas.get(3).endpoint();
-        UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm, nowInSec, 2L, dk).clustering("3")
+        UnfilteredPartitionIterator iter4 = iter(new RowUpdateBuilder(cfm3, nowInSec, 2L, dk).clustering("3")
                                                                                             .add("one", "A")
                                                                                             .buildUpdate());
         resolver.preprocess(response(command, peer4, iter4));
@@ -1294,7 +1317,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     private void assertRepairMetadata(Mutation mutation)
     {
         PartitionUpdate update = mutation.getPartitionUpdates().iterator().next();
-        assertEquals(update.metadata().keyspace, cfm.keyspace);
+        assertEquals(update.metadata().keyspace, ks.getName());
         assertEquals(update.metadata().name, cfm.name);
     }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 3d39732..ed4ef3f 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -1,14 +1,24 @@
 package org.apache.cassandra.service.reads.repair;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
+import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.dht.ByteOrderedPartitioner;
+import org.apache.cassandra.dht.Murmur3Partitioner;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,6 +51,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.locator.EndpointsForRange;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.schema.KeyspaceMetadata;
@@ -48,6 +59,7 @@ import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.schema.MigrationManager;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.schema.Tables;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.locator.Replica.fullReplica;
@@ -216,6 +228,14 @@ public abstract  class AbstractReadRepairTest
 
         replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
 
+        StorageService.instance.getTokenMetadata().clearUnsafe();
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 0 })), replica1.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 1 })), replica2.endpoint());
+        StorageService.instance.getTokenMetadata().updateNormalToken(ByteOrderedPartitioner.instance.getToken(ByteBuffer.wrap(new byte[] { 2 })), replica3.endpoint());
+        Gossiper.instance.initializeNodeUnsafe(replica1.endpoint(), UUID.randomUUID(), 1);
+        Gossiper.instance.initializeNodeUnsafe(replica2.endpoint(), UUID.randomUUID(), 1);
+        Gossiper.instance.initializeNodeUnsafe(replica3.endpoint(), UUID.randomUUID(), 1);
+
         // default test values
         key  = dk(5);
         cell1 = cell("v", "val1", now);
@@ -247,6 +267,22 @@ public abstract  class AbstractReadRepairTest
         return replicaPlan(ks, consistencyLevel, replicas, replicas);
     }
 
+    static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan)
+    {
+        return repairPlan(readPlan, readPlan.candidates());
+    }
+
+    static ReplicaPlan.ForTokenWrite repairPlan(EndpointsForRange liveAndDown, EndpointsForRange targets)
+    {
+        return repairPlan(replicaPlan(liveAndDown, targets), liveAndDown);
+    }
+
+    static ReplicaPlan.ForTokenWrite repairPlan(ReplicaPlan.ForRangeRead readPlan, EndpointsForRange liveAndDown)
+    {
+        Token token = readPlan.range().left.getToken();
+        EndpointsForToken pending = EndpointsForToken.empty(token);
+        return ReplicaPlans.forWrite(ks, ConsistencyLevel.TWO, liveAndDown.forToken(token), pending, Predicates.alwaysTrue(), ReplicaPlans.writeReadRepair(readPlan));
+    }
     static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
     {
         return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index 3cc1a63..e4b3a71 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -44,13 +44,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
 
 public class BlockingReadRepairTest extends AbstractReadRepairTest
 {
-    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-            extends BlockingPartitionRepair<E, P>
+    private static class InstrumentedReadRepairHandler
+            extends BlockingPartitionRepair
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan,
-                    e -> targets.contains(e));
+            super(Util.dk("not a real usable value"), repairs, writePlan, e -> targets.contains(e));
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -67,16 +66,15 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         configureClass(ReadRepairStrategy.BLOCKING);
     }
 
-    private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-    InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
     {
-        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
+        return new InstrumentedReadRepairHandler(repairs, writePlan);
     }
 
-    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
+    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs)
     {
         EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
+        return createRepairHandler(repairs, repairPlan(replicas, replicas));
     }
 
     private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
@@ -140,8 +138,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, repair1);
         repairs.put(replica2, repair2);
 
-        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaPlan);
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
 
         Assert.assertTrue(handler.mutationsSent.isEmpty());
 
@@ -176,7 +174,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, mutation(cell2));
         repairs.put(replica2, mutation(cell1));
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
         handler.sendInitialRepairs();
         handler.ack(target1);
         handler.ack(target2);
@@ -197,7 +195,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, mutation(cell2));
         repairs.put(replica2, mutation(cell1));
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
         handler.sendInitialRepairs();
 
         // we've already sent mutations to all candidates, so we shouldn't send any more
@@ -219,7 +217,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, repair1);
 
         // check that the correct initial mutations are sent out
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, repairPlan(replicas, EndpointsForRange.of(replica1, replica2)));
         handler.sendInitialRepairs();
         Assert.assertEquals(1, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1));
@@ -240,7 +238,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica3, mutation(cell3));
         Assert.assertEquals(3, repairs.size());
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs);
         handler.sendInitialRepairs();
 
         Assert.assertFalse(getCurrentRepairStatus(handler));
@@ -266,8 +264,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(remote1, mutation(cell1));
 
         EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2);
-        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, writePlan);
         handler.sendInitialRepairs();
         Assert.assertEquals(2, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index c15d7f4..7806a3f 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 
 import com.google.common.collect.Lists;
+
 import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.After;
 import org.junit.Assert;
@@ -81,8 +82,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica2, repair2);
 
 
-        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, writePlan);
 
         Assert.assertTrue(handler.updatesByEp.isEmpty());
 
@@ -117,15 +118,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         return new DiagnosticBlockingRepairHandler(command, replicaPlan, queryStartNanoTime);
     }
 
-    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaPlan.ForRead<?> replicaPlan)
-    {
-        return new DiagnosticPartitionReadRepairHandler<>(key, repairs, maxBlockFor, replicaPlan);
-    }
-
-    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
+    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
     {
-        EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
+        return new DiagnosticPartitionReadRepairHandler(key, repairs, writePlan);
     }
 
     private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair
@@ -170,7 +165,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
     }
 
     private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-            extends BlockingPartitionRepair<E, P>
+            extends BlockingPartitionRepair
     {
         private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>();
 
@@ -180,9 +175,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
             return e -> candidates.contains(e);
         }
 
-        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(key, repairs, maxBlockFor, replicaPlan, isLocal());
+            super(key, repairs, writePlan, isLocal());
             DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent);
         }
 
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index cc7fdf1..c0af493 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -92,8 +92,9 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
     @Test(expected = UnsupportedOperationException.class)
     public void repairPartitionFailure()
     {
-        ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
-        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan);
-        repair.repairPartition(null, Collections.emptyMap(), replicaPlan.get());
+        ReplicaPlan.SharedForRangeRead readPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
+        ReplicaPlan.ForTokenWrite writePlan = repairPlan(replicas, replicas);
+        InstrumentedReadRepair repair = createInstrumentedReadRepair(readPlan);
+        repair.repairPartition(null, Collections.emptyMap(), writePlan);
     }
 }
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index 5ae9dd8..7458e9b 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -49,8 +49,6 @@ import org.apache.cassandra.db.rows.Cell;
 import org.apache.cassandra.db.rows.Row;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.ReplicaUtils;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.KeyspaceParams;
@@ -72,12 +70,11 @@ public class ReadRepairTest
     static EndpointsForRange targets;
 
     private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
-            extends BlockingPartitionRepair<E, P>
+            extends BlockingPartitionRepair
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, ReplicaPlan.ForTokenWrite writePlan)
         {
-            super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan,
-                    e -> replicaPlan.consistencyLevel().isDatacenterLocal() && targets.endpoints().contains(e));
+            super(Util.dk("not a valid key"), repairs, writePlan, e -> targets.endpoints().contains(e));
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -161,10 +158,11 @@ public class ReadRepairTest
         return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, builder.build()));
     }
 
-    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets)
+    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, EndpointsForRange all, EndpointsForRange targets)
     {
-        ReplicaPlan.ForRangeRead replicaPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
-        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
+        ReplicaPlan.ForRangeRead readPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
+        ReplicaPlan.ForTokenWrite writePlan = AbstractReadRepairTest.repairPlan(readPlan);
+        return new InstrumentedReadRepairHandler(repairs, writePlan);
     }
 
     @Test
@@ -198,8 +196,7 @@ public class ReadRepairTest
         repairs.put(target1, repair1);
         repairs.put(target2, repair2);
 
-        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2,
-                                                                    targets, EndpointsForRange.of(target1, target2));
+        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2));
 
         Assert.assertTrue(handler.mutationsSent.isEmpty());
 
@@ -234,7 +231,7 @@ public class ReadRepairTest
         repairs.put(target2, mutation(cell1));
 
         EndpointsForRange replicas = EndpointsForRange.of(target1, target2);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, targets);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, targets);
         handler.sendInitialRepairs();
         handler.ack(target1.endpoint());
         handler.ack(target2.endpoint());
@@ -255,7 +252,7 @@ public class ReadRepairTest
         repairs.put(target1, mutation(cell2));
         repairs.put(target2, mutation(cell1));
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, EndpointsForRange.of(target1, target2),
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, EndpointsForRange.of(target1, target2),
                                                                     EndpointsForRange.of(target1, target2));
         handler.sendInitialRepairs();
 
@@ -278,7 +275,7 @@ public class ReadRepairTest
         repairs.put(target1, repair1);
 
         // check that the correct initial mutations are sent out
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, targets, EndpointsForRange.of(target1, target2));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, targets, EndpointsForRange.of(target1, target2));
         handler.sendInitialRepairs();
         Assert.assertEquals(1, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
@@ -301,7 +298,7 @@ public class ReadRepairTest
         Assert.assertEquals(3, repairs.size());
 
         EndpointsForRange replicas = EndpointsForRange.of(target1, target2, target3);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, replicas, replicas);
         handler.sendInitialRepairs();
 
         Assert.assertFalse(getCurrentRepairStatus(handler));
@@ -311,7 +308,6 @@ public class ReadRepairTest
         // here we should stop blocking, even though we've sent 3 repairs
         handler.ack(target2.endpoint());
         Assert.assertTrue(getCurrentRepairStatus(handler));
-
     }
 
     /**
@@ -330,7 +326,7 @@ public class ReadRepairTest
         EndpointsForRange participants = EndpointsForRange.of(target1, target2, remote1, remote2);
         EndpointsForRange targets = EndpointsForRange.of(target1, target2);
 
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants, targets);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, participants, targets);
         handler.sendInitialRepairs();
         Assert.assertEquals(2, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint()));
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 53964cb..84276d5 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForTokenWrite writePlan)
     {
         for (Map.Entry<Replica, Mutation> entry: mutations.entrySet())
             sent.put(entry.getKey().endpoint(), entry.getValue());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[cassandra] 01/02: Add missing CHANGES.txt entry

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 5ec9e081bdfd2bdb2f99f6611022f68570083e6e
Author: Sam Tunnicliffe <sa...@beobal.com>
AuthorDate: Tue Feb 4 14:03:21 2020 +0000

    Add missing CHANGES.txt entry
---
 CHANGES.txt | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index 9554a00..75fae01 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-alpha3
+ * Separate exceptions for CAS write timeout exceptions caused by contention and unkown result (CASSANDRA-15350)
  * Fix in-jvm dtest java 11 compatibility (CASSANDRA-15463)
  * Remove joda time dependency (CASSANDRA-15257)
  * Exclude purgeable tombstones from repaired data tracking (CASSANDRA-15462)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org