You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/14 09:14:51 UTC

[1/4] cassandra git commit: ReplicaPlan/Layout refactor follow-up/completion

Repository: cassandra
Updated Branches:
  refs/heads/trunk 05dbb3e0a -> 047bcd7ad


http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
index cf1e06a..a3f13c2 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTest.java
@@ -27,7 +27,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.base.Predicates;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.locator.EndpointsForToken;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -236,7 +236,7 @@ public class WriteResponseHandlerTest
 
     private static AbstractWriteResponseHandler createWriteResponseHandler(ConsistencyLevel cl, ConsistencyLevel ideal, long queryStartTime)
     {
-        return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaLayout.forWriteWithDownNodes(ks, cl, targets.token(), targets, pending),
+        return ks.getReplicationStrategy().getWriteResponseHandler(ReplicaPlans.forWrite(ks, cl, targets, pending, Predicates.alwaysTrue(), ReplicaPlans.writeAll),
                                                                    null, WriteType.SIMPLE, queryStartTime, ideal);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
index c19e65e..b6c95dd 100644
--- a/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
+++ b/test/unit/org/apache/cassandra/service/WriteResponseHandlerTransientTest.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.function.Predicate;
 
-import com.google.common.base.Predicates;
 import com.google.common.collect.Sets;
 
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -32,10 +31,10 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.EndpointsForToken;
 import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.EndpointsForRange;
-import org.apache.cassandra.locator.ReplicaUtils;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
@@ -54,7 +53,6 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
-import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE;
 import static org.apache.cassandra.locator.ReplicaUtils.full;
 import static org.apache.cassandra.locator.ReplicaUtils.trans;
 
@@ -147,32 +145,35 @@ public class WriteResponseHandlerTransientTest
         dummy = DatabaseDescriptor.getPartitioner().getToken(ByteBufferUtil.bytes(0));
     }
 
-    @Ignore("Throws unavailable for quorum as written")
     @Test
     public void checkPendingReplicasAreNotFiltered()
     {
-        EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3));
-        EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), full(EP5), trans(EP6));
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), 2, natural, pending, Predicates.alwaysTrue());
+        EndpointsForToken natural = EndpointsForToken.of(dummy.getToken(), full(EP1), full(EP2), trans(EP3), full(EP5));
+        EndpointsForToken pending = EndpointsForToken.of(dummy.getToken(), full(EP4), trans(EP6));
+        ReplicaLayout.ForTokenWrite layout = new ReplicaLayout.ForTokenWrite(natural, pending);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, layout, layout, ReplicaPlans.writeAll);
 
-        Assert.assertEquals(EndpointsForRange.of(full(EP4), full(EP5), trans(EP6)), replicaLayout.pending());
+        Assert.assertEquals(EndpointsForRange.of(full(EP4), trans(EP6)), replicaPlan.pending());
     }
 
-    private static ReplicaLayout.ForToken expected(EndpointsForToken all, EndpointsForToken selected)
+    private static ReplicaPlan.ForTokenWrite expected(EndpointsForToken natural, EndpointsForToken selected)
     {
-        return new ReplicaLayout.ForToken(ks, ConsistencyLevel.QUORUM, dummy.getToken(), all, EndpointsForToken.empty(dummy.getToken()), selected);
+        return new ReplicaPlan.ForTokenWrite(ks, ConsistencyLevel.QUORUM, EndpointsForToken.empty(dummy.getToken()), natural, natural, selected);
     }
 
-    private static ReplicaLayout.ForToken getSpeculationContext(EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+    private static ReplicaPlan.ForTokenWrite getSpeculationContext(EndpointsForToken natural, Predicate<InetAddressAndPort> livePredicate)
     {
-        return ReplicaLayout.forWrite(ks, ConsistencyLevel.QUORUM, dummy.getToken(), blockFor, replicas, EndpointsForToken.empty(dummy.getToken()), livePredicate);
+        ReplicaLayout.ForTokenWrite liveAndDown = new ReplicaLayout.ForTokenWrite(natural, EndpointsForToken.empty(dummy.getToken()));
+        ReplicaLayout.ForTokenWrite live = new ReplicaLayout.ForTokenWrite(natural.filter(r -> livePredicate.test(r.endpoint())), EndpointsForToken.empty(dummy.getToken()));
+        return ReplicaPlans.forWrite(ks, ConsistencyLevel.QUORUM, liveAndDown, live, ReplicaPlans.writeNormal);
     }
 
-    private static void assertSpeculationReplicas(ReplicaLayout.ForToken expected, EndpointsForToken replicas, int blockFor, Predicate<InetAddressAndPort> livePredicate)
+    private static void assertSpeculationReplicas(ReplicaPlan.ForTokenWrite expected, EndpointsForToken replicas, Predicate<InetAddressAndPort> livePredicate)
     {
-        ReplicaLayout.ForToken actual = getSpeculationContext(replicas, blockFor, livePredicate);
-        Assert.assertEquals(expected.natural(), actual.natural());
-        Assert.assertEquals(expected.selected(), actual.selected());
+        ReplicaPlan.ForTokenWrite actual = getSpeculationContext(replicas, livePredicate);
+        Assert.assertEquals(expected.pending(), actual.pending());
+        Assert.assertEquals(expected.live(), actual.live());
+        Assert.assertEquals(expected.contacts(), actual.contacts());
     }
 
     private static Predicate<InetAddressAndPort> dead(InetAddressAndPort... endpoints)
@@ -186,39 +187,35 @@ public class WriteResponseHandlerTransientTest
         return EndpointsForToken.of(dummy.getToken(), rr);
     }
 
-    @Ignore("Throws unavailable for quorum as written")
     @Test
     public void checkSpeculationContext()
     {
-        EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3));
+        EndpointsForToken all = replicas(full(EP1), full(EP2), trans(EP3), full(EP4), full(EP5), trans(EP6));
         // in happy path, transient replica should be classified as a backup
-        assertSpeculationReplicas(expected(all,
-                                           replicas(full(EP1), full(EP2))),
-                                  replicas(full(EP1), full(EP2), trans(EP3)),
-                                  2, dead());
-
-        // if one of the full replicas is dead, they should all be in the initial contacts
-        assertSpeculationReplicas(expected(all,
-                                           replicas(full(EP1), trans(EP3))),
-                                  replicas(full(EP1), full(EP2), trans(EP3)),
-                                  2, dead(EP2));
-
-        // block only for 1 full replica, use transient as backups
-        assertSpeculationReplicas(expected(all,
-                                           replicas(full(EP1))),
-                                  replicas(full(EP1), full(EP2), trans(EP3)),
-                                  1, dead(EP2));
+        assertSpeculationReplicas(expected(all, replicas(full(EP1), full(EP2), full(EP4), full(EP5))),
+                                  all,
+                                  dead());
+
+        // full replicas must always be in the contact list, and will occur first
+        assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), full(EP4), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), full(EP5), trans(EP3), trans(EP6))),
+                                  all,
+                                  dead(EP2, EP5));
+
+        // only one transient used as backup
+        assertSpeculationReplicas(expected(replicas(full(EP1), trans(EP3), full(EP4), full(EP5), trans(EP6)), replicas(full(EP1), full(EP2), full(EP4), full(EP5), trans(EP3))),
+                all,
+                dead(EP2));
     }
 
     @Test (expected = UnavailableException.class)
     public void noFullReplicas()
     {
-        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP1));
+        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), dead(EP1));
     }
 
     @Test (expected = UnavailableException.class)
     public void notEnoughTransientReplicas()
     {
-        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), 2, dead(EP2, EP3));
+        getSpeculationContext(replicas(full(EP1), trans(EP2), trans(EP3)), dead(EP2, EP3));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
index 968ef16..c49bf3a 100644
--- a/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTest.java
@@ -24,6 +24,7 @@ import java.util.Iterator;
 
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -87,7 +88,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     {
         command = Util.cmd(cfs, dk).withNowInSeconds(nowInSec).build();
         command.trackRepairedStatus();
-        readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
+        readRepair = new TestableReadRepair(command);
     }
 
     private static EndpointsForRange makeReplicas(int num)
@@ -338,7 +339,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     public void testResolveWithBothEmpty()
     {
         EndpointsForRange replicas = makeReplicas(2);
-        TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(command);
         DataResolver resolver = new DataResolver(command, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
         resolver.preprocess(response(command, replicas.get(0).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
         resolver.preprocess(response(command, replicas.get(1).endpoint(), EmptyIterators.unfilteredPartition(cfm)));
@@ -715,7 +716,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -767,7 +768,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -811,7 +812,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -861,7 +862,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     {
         EndpointsForRange replicas = makeReplicas(2);
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        TestableReadRepair readRepair = new TestableReadRepair(cmd, ConsistencyLevel.QUORUM);
+        TestableReadRepair readRepair = new TestableReadRepair(cmd);
         DataResolver resolver = new DataResolver(cmd, plan(replicas, ConsistencyLevel.ALL), readRepair, System.nanoTime());
 
         long[] ts = {100, 200};
@@ -1224,7 +1225,7 @@ public class DataResolverTest extends AbstractReadResponseTest
     }
 
     private DataResolver resolverWithVerifier(final ReadCommand command,
-                                              final ReplicaLayout.ForRange plan,
+                                              final ReplicaPlan.SharedForRangeRead plan,
                                               final ReadRepair readRepair,
                                               final long queryStartNanoTime,
                                               final RepairedDataVerifier verifier)
@@ -1232,7 +1233,7 @@ public class DataResolverTest extends AbstractReadResponseTest
         class TestableDataResolver extends DataResolver
         {
 
-            public TestableDataResolver(ReadCommand command, ReplicaLayout.ForRange plan, ReadRepair readRepair, long queryStartNanoTime)
+            public TestableDataResolver(ReadCommand command, ReplicaPlan.SharedForRangeRead plan, ReadRepair readRepair, long queryStartNanoTime)
             {
                 super(command, plan, readRepair, queryStartNanoTime);
             }
@@ -1298,9 +1299,9 @@ public class DataResolverTest extends AbstractReadResponseTest
         assertEquals(update.metadata().name, cfm.name);
     }
 
-    private ReplicaLayout.ForRange plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
+    private ReplicaPlan.SharedForRangeRead plan(EndpointsForRange replicas, ConsistencyLevel consistencyLevel)
     {
-        return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
+        return ReplicaPlan.shared(new ReplicaPlan.ForRangeRead(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas));
     }
 
     private static void resolveAndConsume(DataResolver resolver)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
new file mode 100644
index 0000000..456cec4
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.reads;
+
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.primitives.Ints;
+
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.cassandra.db.Clustering;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DeletionTime;
+import org.apache.cassandra.db.EmptyIterators;
+import org.apache.cassandra.db.RangeTombstone;
+import org.apache.cassandra.db.SimpleBuilders;
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.db.Slice;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.reads.repair.TestableReadRepair;
+import org.apache.cassandra.utils.ByteBufferUtil;
+
+import static org.apache.cassandra.db.ConsistencyLevel.QUORUM;
+import static org.apache.cassandra.locator.Replica.fullReplica;
+import static org.apache.cassandra.locator.Replica.transientReplica;
+import static org.apache.cassandra.locator.ReplicaUtils.full;
+import static org.apache.cassandra.locator.ReplicaUtils.trans;
+
+/**
+ * Tests DataResolvers handing of transient replicas
+ */
+public class DataResolverTransientTest extends AbstractReadResponseTest
+{
+    private static DecoratedKey key;
+
+    @Before
+    public void setUp()
+    {
+        key = Util.dk("key1");
+    }
+
+    private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows)
+    {
+        PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false);
+        for (Row row: rows)
+        {
+            builder.add(row);
+        }
+        return builder;
+    }
+
+    private static PartitionUpdate.Builder update(Row... rows)
+    {
+        return update(cfm, "key1", rows);
+    }
+
+    private static Row.SimpleBuilder rowBuilder(int clustering)
+    {
+        return new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering));
+    }
+
+    private static Row row(long timestamp, int clustering, int value)
+    {
+        return rowBuilder(clustering).timestamp(timestamp).add("c1", Integer.toString(value)).build();
+    }
+
+    private static DeletionTime deletion(long timeMillis)
+    {
+        TimeUnit MILLIS = TimeUnit.MILLISECONDS;
+        return new DeletionTime(MILLIS.toMicros(timeMillis), Ints.checkedCast(MILLIS.toSeconds(timeMillis)));
+    }
+
+    /**
+     * Tests that the given update doesn't cause data resolver to attempt to repair a transient replica
+     */
+    private void assertNoTransientRepairs(PartitionUpdate update)
+    {
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key);
+        EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair repair = new TestableReadRepair(command);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, ConsistencyLevel.QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP1, iter(update), false));
+        resolver.preprocess(response(command, EP2, iter(update), false));
+        resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(update.metadata()), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+        assertPartitionsEqual(filter(iter(update)), resolver.resolve());
+        Assert.assertTrue(repair.dataWasConsumed());
+        Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty());
+    }
+
+    @Test
+    public void emptyRowRepair()
+    {
+        assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 5)).build());
+    }
+
+    @Test
+    public void emptyPartitionDeletionRepairs()
+    {
+        PartitionUpdate.Builder builder = update();
+        builder.addPartitionDeletion(deletion(1999));
+        assertNoTransientRepairs(builder.build());
+    }
+
+    /**
+     * Partition level deletion responses shouldn't sent data to a transient replica
+     */
+    @Test
+    public void emptyRowDeletionRepairs()
+    {
+        PartitionUpdate.Builder builder = update();
+        builder.add(rowBuilder(1).timestamp(1999).delete().build());
+        assertNoTransientRepairs(builder.build());
+    }
+
+    @Test
+    public void emptyComplexDeletionRepair()
+    {
+
+        long[] ts = {1000, 2000};
+
+        Row.Builder builder = BTreeRow.unsortedBuilder();
+        builder.newRow(Clustering.EMPTY);
+        builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec));
+        assertNoTransientRepairs(update(cfm2, "key", builder.build()).build());
+
+    }
+
+    @Test
+    public void emptyRangeTombstoneRepairs()
+    {
+        Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), Clustering.make(ByteBufferUtil.bytes("b")));
+        PartitionUpdate.Builder builder = update();
+        builder.add(new RangeTombstone(slice, deletion(2000)));
+        assertNoTransientRepairs(builder.build());
+    }
+
+    /**
+     * If the full replicas need to repair each other, repairs shouldn't be sent to transient replicas
+     */
+    @Test
+    public void fullRepairsIgnoreTransientReplicas()
+    {
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+        EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair repair = new TestableReadRepair(command);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 5)).build()), false));
+        resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 4)).build()), false));
+        resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(cfm), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+
+        consume(resolver.resolve());
+
+        Assert.assertTrue(repair.dataWasConsumed());
+
+        Assert.assertTrue(repair.sent.containsKey(EP1));
+        Assert.assertTrue(repair.sent.containsKey(EP2));
+        Assert.assertFalse(repair.sent.containsKey(EP3));
+    }
+
+    /**
+     * If the transient replica has new data, the full replicas shoould be repaired, the transient one should not
+     */
+    @Test
+    public void transientMismatchesRepairFullReplicas()
+    {
+        SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5));
+        EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3));
+        TestableReadRepair<?, ?> repair = new TestableReadRepair(command);
+        DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0);
+
+        Assert.assertFalse(resolver.isDataPresent());
+        PartitionUpdate transData = update(row(1000, 5, 5)).build();
+        resolver.preprocess(response(command, EP1, EmptyIterators.unfilteredPartition(cfm), false));
+        resolver.preprocess(response(command, EP2, EmptyIterators.unfilteredPartition(cfm), false));
+        resolver.preprocess(response(command, EP3, iter(transData), false));
+
+        Assert.assertFalse(repair.dataWasConsumed());
+
+        assertPartitionsEqual(filter(iter(transData)), resolver.resolve());
+
+        Assert.assertTrue(repair.dataWasConsumed());
+
+        assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm))));
+        assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm))));
+        Assert.assertFalse(repair.sent.containsKey(EP3));
+
+    }
+
+    private ReplicaPlan.SharedForTokenRead plan(EndpointsForToken replicas, ConsistencyLevel consistencyLevel)
+    {
+        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, consistencyLevel, replicas, replicas));
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
index 8454d6a..99101f1 100644
--- a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.cassandra.service.reads;
 
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -158,8 +159,8 @@ public class DigestResolverTest extends AbstractReadResponseTest
                               resolver.getData());
     }
 
-    private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas)
+    private ReplicaPlan.SharedForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas)
     {
-        return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas);
+        return ReplicaPlan.shared(new ReplicaPlan.ForTokenRead(ks, consistencyLevel, replicas, replicas));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
index 3b102f2..34be5ee 100644
--- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java
@@ -24,6 +24,7 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -224,13 +225,13 @@ public class ReadExecutorTest
 
     }
 
-    private ReplicaLayout.ForToken plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel)
+    private ReplicaPlan.ForTokenRead plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel)
     {
         return plan(consistencyLevel, targets, targets);
     }
 
-    private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected)
+    private ReplicaPlan.ForTokenRead plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected)
     {
-        return new ReplicaLayout.ForToken(ks, consistencyLevel, natural.token(), natural, null, selected);
+        return new ReplicaPlan.ForTokenRead(ks, consistencyLevel, natural, selected);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
index 7e6ee29..5115581 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java
@@ -10,6 +10,7 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import com.google.common.primitives.Ints;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -70,7 +71,7 @@ public abstract  class AbstractReadRepairTest
     static Replica replica2;
     static Replica replica3;
     static EndpointsForRange replicas;
-    static ReplicaLayout<?, ?> replicaLayout;
+    static ReplicaPlan.ForRead<?> replicaPlan;
 
     static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime());
     static DecoratedKey key;
@@ -217,7 +218,7 @@ public abstract  class AbstractReadRepairTest
         replica3 = fullReplica(target3, FULL_RANGE);
         replicas = EndpointsForRange.of(replica1, replica2, replica3);
 
-        replicaLayout = replicaLayout(ConsistencyLevel.QUORUM, replicas);
+        replicaPlan = replicaPlan(ConsistencyLevel.QUORUM, replicas);
 
         // default test values
         key  = dk(5);
@@ -245,21 +246,30 @@ public abstract  class AbstractReadRepairTest
         cfs.transientWriteLatencyNanos = 0;
     }
 
-    static ReplicaLayout.ForRange replicaLayout(EndpointsForRange replicas, EndpointsForRange targets)
+    static ReplicaPlan.ForRangeRead replicaPlan(ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
     {
-        return new ReplicaLayout.ForRange(ks, ConsistencyLevel.QUORUM, ReplicaUtils.FULL_BOUNDS, replicas, targets);
+        return replicaPlan(ks, consistencyLevel, replicas, replicas);
     }
 
-    static ReplicaLayout.ForRange replicaLayout(ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
+    static ReplicaPlan.ForRangeRead replicaPlan(EndpointsForRange replicas, EndpointsForRange targets)
     {
-        return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas);
+        return replicaPlan(ks, ConsistencyLevel.QUORUM, replicas, targets);
+    }
+    static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas)
+    {
+        return replicaPlan(keyspace, consistencyLevel, replicas, replicas);
+    }
+    static ReplicaPlan.ForRangeRead replicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForRange replicas, EndpointsForRange targets)
+    {
+        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel,
+                ReplicaUtils.FULL_BOUNDS, replicas, targets);
     }
 
-    public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime);
+    public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime);
 
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaLayout<?, ?> replicaLayout)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaPlan.Shared<?, ?> replicaPlan)
     {
-        return createInstrumentedReadRepair(command, replicaLayout, System.nanoTime());
+        return createInstrumentedReadRepair(command, replicaPlan, System.nanoTime());
 
     }
 
@@ -270,7 +280,7 @@ public abstract  class AbstractReadRepairTest
     @Test
     public void readSpeculationCycle()
     {
-        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))));
         ResultConsumer consumer = new ResultConsumer();
 
         Assert.assertEquals(epSet(), repair.getReadRecipients());
@@ -289,7 +299,7 @@ public abstract  class AbstractReadRepairTest
     @Test
     public void noSpeculationRequired()
     {
-        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepair repair = createInstrumentedReadRepair(ReplicaPlan.shared(replicaPlan(replicas, EndpointsForRange.of(replica1, replica2))));
         ResultConsumer consumer = new ResultConsumer();
 
         Assert.assertEquals(epSet(), repair.getReadRecipients());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
index a4b7615..6bb1b7a 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
 
 public class BlockingReadRepairTest extends AbstractReadRepairTest
 {
-    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends BlockingPartitionRepair<E, P>
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
         {
-            super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaLayout);
+            super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaPlan);
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -71,22 +73,24 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         configureClass(ReadRepairStrategy.BLOCKING);
     }
 
-    private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout)
+    private static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+    InstrumentedReadRepairHandler<E, P> createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
     {
-        return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout);
+        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
     }
 
     private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
     {
         EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas));
+        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
     }
 
-    private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingReadRepair<E, L> implements InstrumentedReadRepair<E, L>
+    private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends BlockingReadRepair<E, P> implements InstrumentedReadRepair<E, P>
     {
-        public InstrumentedBlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+        public InstrumentedBlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
         {
-            super(command, replicaLayout, queryStartNanoTime);
+            super(command, replicaPlan, queryStartNanoTime);
         }
 
         Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -114,9 +118,9 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
     }
 
     @Override
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime)
     {
-        return new InstrumentedBlockingReadRepair(command, replicaLayout, queryStartNanoTime);
+        return new InstrumentedBlockingReadRepair(command, replicaPlan, queryStartNanoTime);
     }
 
     @Test
@@ -142,8 +146,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, repair1);
         repairs.put(replica2, repair2);
 
-        ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaPlan);
 
         Assert.assertTrue(handler.mutationsSent.isEmpty());
 
@@ -221,7 +225,7 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica1, repair1);
 
         // check that the correct initial mutations are sent out
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout(replicas, EndpointsForRange.of(replica1, replica2)));
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan(replicas, EndpointsForRange.of(replica1, replica2)));
         handler.sendInitialRepairs();
         Assert.assertEquals(1, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(target1));
@@ -269,8 +273,8 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(remote1, mutation(cell1));
 
         EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2);
-        ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, participants, participants);
-        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, participants);
+        InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
         handler.sendInitialRepairs();
         Assert.assertEquals(2, handler.mutationsSent.size());
         Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint()));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
index a5efe27..c64a73b 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -80,8 +81,8 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         repairs.put(replica2, repair2);
 
 
-        ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
-        DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = replicaPlan(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())));
+        DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaPlan);
 
         Assert.assertTrue(handler.updatesByEp.isEmpty());
 
@@ -106,20 +107,20 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS));
     }
 
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
     {
-        return new DiagnosticBlockingRepairHandler(command, replicaLayout, queryStartNanoTime);
+        return new DiagnosticBlockingRepairHandler(command, replicaPlan, queryStartNanoTime);
     }
 
-    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout)
+    private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaPlan.ForRead<?> replicaPlan)
     {
-        return new DiagnosticPartitionReadRepairHandler(key, repairs, maxBlockFor, replicaLayout);
+        return new DiagnosticPartitionReadRepairHandler<>(key, repairs, maxBlockFor, replicaPlan);
     }
 
     private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor)
     {
         EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()));
-        return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas));
+        return createRepairHandler(repairs, maxBlockFor, replicaPlan(replicas, replicas));
     }
 
     private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair
@@ -127,9 +128,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         private Set<InetAddressAndPort> recipients = Collections.emptySet();
         private ReadCallback readCallback = null;
 
-        DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+        DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaPlan.Shared<?,?> replicaPlan, long queryStartNanoTime)
         {
-            super(command, replicaLayout, queryStartNanoTime);
+            super(command, replicaPlan, queryStartNanoTime);
             DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, this::onRepairEvent);
         }
 
@@ -163,13 +164,14 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest
         }
     }
 
-    private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+    private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends BlockingPartitionRepair<E, P>
     {
         private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>();
 
-        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+        DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
         {
-            super(key, repairs, maxBlockFor, replicaLayout);
+            super(key, repairs, maxBlockFor, replicaPlan);
             DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
index f3d2866..81ab07e 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java
@@ -23,9 +23,11 @@ import java.util.Set;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.ReadCallback;
 
-public interface InstrumentedReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadRepair<E, L>
+public interface InstrumentedReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        extends ReadRepair<E, P>
 {
     Set<InetAddressAndPort> getReadRecipients();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
index bee5ddd..cf12265 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java
@@ -22,6 +22,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,11 +37,12 @@ import org.apache.cassandra.service.reads.ReadCallback;
 
 public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
 {
-    private static class InstrumentedReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadOnlyReadRepair implements InstrumentedReadRepair
+    private static class InstrumentedReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends ReadOnlyReadRepair implements InstrumentedReadRepair
     {
-        public InstrumentedReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+        public InstrumentedReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
         {
-            super(command, replicaLayout, queryStartNanoTime);
+            super(command, replicaPlan, queryStartNanoTime);
         }
 
         Set<InetAddressAndPort> readCommandRecipients = new HashSet<>();
@@ -74,24 +76,24 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest
     }
 
     @Override
-    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime)
+    public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaPlan.Shared<?, ?> replicaPlan, long queryStartNanoTime)
     {
-        return new InstrumentedReadOnlyReadRepair(command, replicaLayout, queryStartNanoTime);
+        return new InstrumentedReadOnlyReadRepair(command, replicaPlan, queryStartNanoTime);
     }
 
     @Test
     public void getMergeListener()
     {
-        ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
-        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout);
-        Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(replicaLayout));
+        ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
+        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan);
+        Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(replicaPlan.get()));
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void repairPartitionFailure()
     {
-        ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas);
-        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout);
-        repair.repairPartition(null, Collections.emptyMap(), replicaLayout);
+        ReplicaPlan.SharedForRangeRead replicaPlan = ReplicaPlan.shared(replicaPlan(replicas, replicas));
+        InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaPlan);
+        repair.repairPartition(null, Collections.emptyMap(), replicaPlan.get());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
index e4ba25d..b678b4d 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Iterables;
 import org.apache.cassandra.Util;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.EndpointsForRange;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -70,11 +71,12 @@ public class ReadRepairTest
     static Replica target3;
     static EndpointsForRange targets;
 
-    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L>
+    private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+            extends BlockingPartitionRepair<E, P>
     {
-        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+        public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
         {
-            super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaLayout);
+            super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaPlan);
         }
 
         Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>();
@@ -166,8 +168,8 @@ public class ReadRepairTest
 
     private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets)
     {
-        ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, all, targets);
-        return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = AbstractReadRepairTest.replicaPlan(ks, ConsistencyLevel.LOCAL_QUORUM, all, targets);
+        return new InstrumentedReadRepairHandler<>(repairs, maxBlockFor, replicaPlan);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
index 2a2dec2..53964cb 100644
--- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
+++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java
@@ -36,28 +36,28 @@ import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 
-public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public class TestableReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        implements ReadRepair<E, P>
 {
     public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>();
 
     private final ReadCommand command;
-    private final ConsistencyLevel consistency;
 
     private boolean partitionListenerClosed = false;
     private boolean rowListenerClosed = true;
 
-    public TestableReadRepair(ReadCommand command, ConsistencyLevel consistency)
+    public TestableReadRepair(ReadCommand command)
     {
         this.command = command;
-        this.consistency = consistency;
     }
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L endpoints)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P endpoints)
     {
-        return new PartitionIteratorMergeListener(endpoints, command, consistency, this) {
+        return new PartitionIteratorMergeListener<E>(endpoints, command, this) {
             @Override
             public void close()
             {
@@ -70,7 +70,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
             {
                 assert rowListenerClosed;
                 rowListenerClosed = false;
-                return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), endpoints, command, consistency, TestableReadRepair.this) {
+                return new RowIteratorMergeListener<E>(partitionKey, columns(versions), isReversed(versions), endpoints, command, TestableReadRepair.this) {
                     @Override
                     public void close()
                     {
@@ -83,7 +83,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
     }
 
     @Override
-    public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
     {
 
     }
@@ -113,7 +113,7 @@ public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
     {
         for (Map.Entry<Replica, Mutation> entry: mutations.entrySet())
             sent.put(entry.getKey().endpoint(), entry.getValue());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[3/4] cassandra git commit: ReplicaPlan/Layout refactor follow-up/completion

Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
new file mode 100644
index 0000000..25f42c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.limit;
+
+public class ReplicaPlans
+{
+
+    /**
+     * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive.
+     */
+    public static ReplicaPlan.ForTokenWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica)
+    {
+        EndpointsForToken one = EndpointsForToken.of(token, replica);
+        EndpointsForToken empty = EndpointsForToken.empty(token);
+        return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, empty, one, one, one);
+    }
+
+    /**
+     * A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator
+     * (if it is not itself an owner)
+     */
+    public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica)
+    {
+        return forSingleReplicaWrite(keyspace, token, replica);
+    }
+
+    /**
+     * Requires that the provided endpoints are alive.  Converts them to their relevant system replicas.
+     * Note that the liveAndDown collection and live are equal to the provided endpoints.
+     *
+     * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO.
+     * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear.
+     */
+    public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+    {
+        // A single case we write not for range or token, but multiple mutations to many tokens
+        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+                SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+                EndpointsForToken.empty(token)
+        );
+        ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+        // assume that we have already been given live endpoints, and skip applying the failure detector
+        return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
+    }
+
+    @VisibleForTesting
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector);
+    }
+
+    @VisibleForTesting
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+    {
+        ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
+        return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
+    {
+        EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
+        ReplicaPlan.ForTokenWrite result = new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
+        result.assureSufficientReplicas();
+        return result;
+    }
+
+    public interface Selector
+    {
+        <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
+    }
+
+    /**
+     * Select all nodes, transient or otherwise, as targets for the operation.
+     *
+     * This is may no longer be useful until we finish implementing transient replication support, however
+     * it can be of value to stipulate that a location writes to all nodes without regard to transient status.
+     */
+    public static final Selector writeAll = new Selector()
+    {
+        @Override
+        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+        {
+            return liveAndDown.all();
+        }
+    };
+
+    /**
+     * Select all full nodes, live or down, as write targets.  If there are insufficient nodes to complete the write,
+     * but there are live transient nodes, select a sufficient number of these to reach our consistency level.
+     *
+     * Pending nodes are always contacted, whether or not they are full.  When a transient replica is undergoing
+     * a pending move to a new node, if we write (transiently) to it, this write would not be replicated to the
+     * pending transient node, and so when completing the move, the write could effectively have not reached the
+     * promised consistency level.
+     */
+    public static final Selector writeNormal = new Selector()
+    {
+        @Override
+        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+        {
+            if (!any(liveAndDown.all(), Replica::isTransient))
+                return liveAndDown.all();
+
+            assert consistencyLevel != ConsistencyLevel.EACH_QUORUM;
+
+            ReplicaCollection.Mutable<E> contacts = liveAndDown.all().newMutable(liveAndDown.all().size());
+            contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
+            contacts.addAll(liveAndDown.pending());
+
+            // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all)
+            int liveCount = contacts.count(live.all()::contains);
+            int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount;
+            if (requiredTransientCount > 0)
+                contacts.addAll(limit(filter(live.natural(), Replica::isTransient), requiredTransientCount));
+            return contacts.asSnapshot();
+        }
+    };
+
+    /**
+     * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
+     * but for the paxos linearisation agreement.
+     *
+     * This will select all live nodes as the candidates for the operation.  Only the required number of participants
+     */
+    public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+    {
+        Token tk = key.getToken();
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+
+        Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547
+
+        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+        {
+            // TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan
+            // Restrict natural and pending to node in the local DC only
+            String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
+
+            liveAndDown = liveAndDown.filter(isLocalDc);
+        }
+
+        ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive);
+
+        // TODO: this should use assureSufficientReplicas
+        int participants = liveAndDown.all().size();
+        int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
+
+        EndpointsForToken contacts = live.all();
+        if (contacts.size() < requiredParticipants)
+            throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size());
+
+        // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
+        // Note that we fake an impossible number of required nodes in the unavailable exception
+        // to nail home the point that it's an impossible operation no matter how many nodes are live.
+        if (liveAndDown.pending().size() > 1)
+            throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()),
+                    consistencyForPaxos,
+                    participants + 1,
+                    contacts.size());
+
+        return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants);
+    }
+
+    /**
+     * Construct a plan for reading from a single node - this permits no speculation or read-repair
+     */
+    public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica)
+    {
+        EndpointsForToken one = EndpointsForToken.of(token, replica);
+        return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, one, one);
+    }
+
+    /**
+     * Construct a plan for reading from a single node - this permits no speculation or read-repair
+     */
+    public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
+    {
+        // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class
+        EndpointsForRange one = EndpointsForRange.of(replica);
+        return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, range, one, one);
+    }
+
+    /**
+     * Construct a plan for reading the provided token at the provided consistency level.  This translates to a collection of
+     *   - candidates who are: alive, replicate the token, and are sorted by their snitch scores
+     *   - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates
+     *
+     * The candidate collection can be used for speculation, although at present it would break
+     * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering
+     */
+    public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+    {
+        ReplicaLayout.ForTokenRead candidates = ReplicaLayout.forTokenReadLiveSorted(keyspace, token);
+        EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural(),
+                retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+
+        ReplicaPlan.ForTokenRead result = new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), contacts);
+        result.assureSufficientReplicas(); // Throw UAE early if we don't have enough replicas.
+        return result;
+    }
+
+    /**
+     * Construct a plan for reading the provided range at the provided consistency level.  This translates to a collection of
+     *   - candidates who are: alive, replicate the range, and are sorted by their snitch scores
+     *   - contacts who are: the first blockFor candidates
+     *
+     * There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query.
+     */
+    public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range)
+    {
+        ReplicaLayout.ForRangeRead candidates = ReplicaLayout.forRangeReadLiveSorted(keyspace, range);
+        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural());
+
+        ReplicaPlan.ForRangeRead result = new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates.natural(), contacts);
+        result.assureSufficientReplicas();
+        return result;
+    }
+
+    /**
+     * Take two range read plans for adjacent ranges, and check if it is OK (and worthwhile) to combine them into a single plan
+     */
+    public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right)
+    {
+        // TODO: should we be asserting that the ranges are adjacent?
+        AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right);
+        EndpointsForRange mergedCandidates = left.candidates().keep(right.candidates().endpoints());
+
+        // Check if there are enough shared endpoints for the merge to be possible.
+        if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, mergedCandidates))
+            return null;
+
+        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, mergedCandidates);
+
+        // Estimate whether merging will be a win or not
+        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts()))
+            return null;
+
+        // If we get there, merge this range and the next one
+        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
index 13a9d74..0d1fc8d 100644
--- a/src/java/org/apache/cassandra/locator/SystemReplicas.java
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -18,12 +18,11 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.Collections2;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -50,13 +49,8 @@ public class SystemReplicas
         return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica);
     }
 
-    public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints)
+    public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints)
     {
-        List<Replica> replicas = new ArrayList<>(endpoints.size());
-        for (InetAddressAndPort endpoint: endpoints)
-        {
-            replicas.add(getSystemReplica(endpoint));
-        }
-        return replicas;
+        return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 4ab34db..ad40d7b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -1224,15 +1224,11 @@ public class TokenMetadata
     /**
      * @deprecated retained for benefit of old tests
      */
+    @Deprecated
     public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural)
     {
         EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName);
-        if (Endpoints.haveConflicts(natural, pending))
-        {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
-        }
-        return Endpoints.concat(natural, pending);
+        return ReplicaLayout.forTokenWrite(natural, pending).all();
     }
 
     /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 253b412..ceaf072 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -17,12 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import com.google.common.base.Predicate;
-
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-
 /**
  * implementors of IAsyncCallback need to make sure that any public methods
  * are threadsafe with respect to response() being called from the message
@@ -31,10 +25,6 @@ import org.apache.cassandra.locator.Replica;
  */
 public interface IAsyncCallback<T>
 {
-    final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
-
-    final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint());
-
     /**
      * @param msg response received.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e817cc8..7f51ae7 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -26,8 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.stream.Collectors;
 
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.ReplicaLayout;
 
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -53,7 +53,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     //Count down until all responses and expirations have occured before deciding whether the ideal CL was reached.
     private AtomicInteger responsesAndExpirations;
     private final SimpleCondition condition = new SimpleCondition();
-    protected final ReplicaLayout.ForToken replicaLayout;
+    protected final ReplicaPlan.ForTokenWrite replicaPlan;
 
     protected final Runnable callback;
     protected final WriteType writeType;
@@ -76,12 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      * @param callback           A callback to be called when the write is successful.
      * @param queryStartNanoTime
      */
-    protected AbstractWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    protected AbstractWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                            Runnable callback,
                                            WriteType writeType,
                                            long queryStartNanoTime)
     {
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
         this.callback = callback;
         this.writeType = writeType;
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
@@ -104,19 +104,19 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
 
         if (!success)
         {
-            int blockedFor = totalBlockFor();
+            int blockedFor = blockFor();
             int acks = ackCount();
             // It's pretty unlikely, but we can race between exiting await above and here, so
             // that we could now have enough acks. In that case, we "lie" on the acks count to
             // avoid sending confusing info to the user (see CASSANDRA-6491).
             if (acks >= blockedFor)
                 acks = blockedFor - 1;
-            throw new WriteTimeoutException(writeType, replicaLayout.consistencyLevel(), acks, blockedFor);
+            throw new WriteTimeoutException(writeType, replicaPlan.consistencyLevel(), acks, blockedFor);
         }
 
-        if (totalBlockFor() + failures > totalEndpoints())
+        if (blockFor() + failures > candidateReplicaCount())
         {
-            throw new WriteFailureException(replicaLayout.consistencyLevel(), ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
+            throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, failureReasonByEndpoint);
         }
     }
 
@@ -135,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler)
     {
         this.idealCLDelegate = handler;
-        idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaLayout.selected().size());
+        idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaPlan.contacts().size());
     }
 
     /**
@@ -189,28 +189,30 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     /**
      * @return the minimum number of endpoints that must reply.
      */
-    protected int totalBlockFor()
+    protected int blockFor()
     {
         // During bootstrap, we have to include the pending endpoints or we may fail the consistency level
         // guarantees (see #833)
-        return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending());
+        return replicaPlan.blockFor();
     }
 
     /**
+     * TODO: this method is brittle for its purpose of deciding when we should fail a query;
+     *       this needs to be CL aware, and of which nodes are live/down
      * @return the total number of endpoints the request can been sent to.
      */
-    protected int totalEndpoints()
+    protected int candidateReplicaCount()
     {
-        return replicaLayout.all().size();
+        return replicaPlan.liveAndDown().size();
     }
 
     public ConsistencyLevel consistencyLevel()
     {
-        return replicaLayout.consistencyLevel();
+        return replicaPlan.consistencyLevel();
     }
 
     /**
-     * @return true if the message counts towards the totalBlockFor() threshold
+     * @return true if the message counts towards the blockFor() threshold
      */
     protected boolean waitingFor(InetAddressAndPort from)
     {
@@ -227,11 +229,6 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      */
     public abstract void response(MessageIn<T> msg);
 
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(), replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending());
-    }
-
     protected void signal()
     {
         condition.signalAll();
@@ -250,7 +247,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
 
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (totalBlockFor() + n > totalEndpoints())
+        if (blockFor() + n > candidateReplicaCount())
             signal();
     }
 
@@ -278,11 +275,11 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
             //The condition being signaled is a valid proxy for the CL being achieved
             if (!condition.isSignaled())
             {
-                replicaLayout.keyspace().metric.writeFailedIdealCL.inc();
+                replicaPlan.keyspace().metric.writeFailedIdealCL.inc();
             }
             else
             {
-                replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
+                replicaPlan.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
             }
         }
     }
@@ -292,7 +289,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      */
     public void maybeTryAdditionalReplicas(IMutation mutation, StorageProxy.WritePerformer writePerformer, String localDC)
     {
-        if (replicaLayout.all().size() == replicaLayout.selected().size())
+        EndpointsForToken uncontacted = replicaPlan.liveUncontacted();
+        if (uncontacted.isEmpty())
             return;
 
         long timeout = Long.MAX_VALUE;
@@ -313,7 +311,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
                 for (ColumnFamilyStore cf : cfs)
                     cf.metric.speculativeWrites.inc();
 
-                writePerformer.apply(mutation, replicaLayout.forNaturalUncontacted(),
+                writePerformer.apply(mutation, replicaPlan.withContact(uncontacted),
                                      (AbstractWriteResponseHandler<IMutation>) this,
                                      localDC);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8ffca6a..b32f67e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -331,7 +331,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
             Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints();
             Iterable<InetAddressAndPort> dcEndpoints = concat(transform(dataCenters, dcEndpointsMap::get));
-            return neighbors.keep(dcEndpoints);
+            return neighbors.select(dcEndpoints, true);
         }
         else if (hosts != null && !hosts.isEmpty())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ee74df5..63fbc72 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
 
     public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
     {
-        super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, queryStartNanoTime);
+        super(wrapped.replicaPlan, wrapped.callback, wrapped.writeType, queryStartNanoTime);
         this.wrapped = wrapped;
         this.requiredBeforeFinish = requiredBeforeFinish;
         this.cleanup = cleanup;
@@ -64,24 +64,19 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
         wrapped.onFailure(from, failureReason);
     }
 
-    public void assureSufficientLiveNodes()
-    {
-        wrapped.assureSufficientLiveNodes();
-    }
-
     public void get() throws WriteTimeoutException, WriteFailureException
     {
         wrapped.get();
     }
 
-    protected int totalBlockFor()
+    protected int blockFor()
     {
-        return wrapped.totalBlockFor();
+        return wrapped.blockFor();
     }
 
-    protected int totalEndpoints()
+    protected int candidateReplicaCount()
     {
-        return wrapped.totalEndpoints();
+        return wrapped.candidateReplicaCount();
     }
 
     protected boolean waitingFor(InetAddressAndPort from)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index d4cdcc6..4c892ff 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -22,10 +22,10 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
@@ -40,16 +40,16 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
     private final Map<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
     private final AtomicInteger acks = new AtomicInteger(0);
 
-    public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                               Runnable callback,
                                               WriteType writeType,
                                               long queryStartNanoTime)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        assert replicaLayout.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
 
-        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaLayout.keyspace().getReplicationStrategy();
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaPlan.keyspace().getReplicationStrategy();
 
         for (String dc : strategy.getDatacenters())
         {
@@ -59,7 +59,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
 
         // During bootstrap, we have to include the pending endpoints or we may fail the consistency level
         // guarantees (see #833)
-        for (Replica pending : replicaLayout.pending())
+        for (Replica pending : replicaPlan.pending())
         {
             responses.get(snitch.getDatacenter(pending)).incrementAndGet();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b458a71..a3ef76f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.service;
 
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 
 /**
@@ -27,13 +27,13 @@ import org.apache.cassandra.net.MessageIn;
  */
 public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
 {
-    public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                           Runnable callback,
                                           WriteType writeType,
                                           long queryStartNanoTime)
     {
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        assert replicaLayout.consistencyLevel().isDatacenterLocal();
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        assert replicaPlan.consistencyLevel().isDatacenterLocal();
     }
 
     @Override
@@ -54,6 +54,6 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
     @Override
     protected boolean waitingFor(InetAddressAndPort from)
     {
-        return replicaLayout.consistencyLevel().isLocal(from);
+        return replicaPlan.consistencyLevel().isLocal(from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5eb43cf..fc49330 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
@@ -135,7 +134,7 @@ public class StorageProxy implements StorageProxyMBean
         standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
         {
             assert mutation instanceof Mutation;
-            sendToHintedReplicas((Mutation) mutation, targets.selected(), responseHandler, localDataCenter, Stage.MUTATION);
+            sendToHintedReplicas((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION);
         };
 
         /*
@@ -146,17 +145,17 @@ public class StorageProxy implements StorageProxyMBean
          */
         counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
         {
-            EndpointsForToken selected = targets.selected().withoutSelf();
+            EndpointsForToken selected = targets.contacts().withoutSelf();
             Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
-            counterWriteTask(mutation, selected, responseHandler, localDataCenter).run();
+            counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter).run();
         };
 
         counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) ->
         {
-            EndpointsForToken selected = targets.selected().withoutSelf();
+            EndpointsForToken selected = targets.contacts().withoutSelf();
             Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
             StageManager.getStage(Stage.COUNTER_MUTATION)
-                        .execute(counterWriteTask(mutation, selected, responseHandler, localDataCenter));
+                        .execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter));
         };
 
         for(ConsistencyLevel level : ConsistencyLevel.values())
@@ -232,9 +231,9 @@ public class StorageProxy implements StorageProxyMBean
             while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the start of each attempt
-                ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
+                ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
 
-                final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, consistencyForPaxos, consistencyForCommit, true, state);
+                final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, consistencyForPaxos, consistencyForCommit, true, state);
                 final UUID ballot = pair.ballot;
                 contentions += pair.contentions;
 
@@ -276,7 +275,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Commit proposal = Commit.newProposal(ballot, updates);
                 Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
-                if (proposePaxos(proposal, replicaLayout, true, queryStartNanoTime))
+                if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime))
                 {
                     commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
                     Tracing.trace("CAS successful");
@@ -334,7 +333,7 @@ public class StorageProxy implements StorageProxyMBean
     private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime,
                                                                 DecoratedKey key,
                                                                 TableMetadata metadata,
-                                                                ReplicaLayout.ForPaxos replicaLayout,
+                                                                ReplicaPlan.ForPaxosWrite paxosPlan,
                                                                 ConsistencyLevel consistencyForPaxos,
                                                                 ConsistencyLevel consistencyForCommit,
                                                                 final boolean isWrite,
@@ -360,7 +359,7 @@ public class StorageProxy implements StorageProxyMBean
             // prepare
             Tracing.trace("Preparing {}", ballot);
             Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            summary = preparePaxos(toPrepare, replicaLayout, queryStartNanoTime);
+            summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
@@ -383,7 +382,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                     casReadMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
-                if (proposePaxos(refreshedInProgress, replicaLayout, false, queryStartNanoTime))
+                if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime))
                 {
                     try
                     {
@@ -440,12 +439,12 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime)
+    private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), queryStartNanoTime);
+        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
-        for (Replica replica: replicaLayout.selected())
+        for (Replica replica: replicaPlan.contacts())
         {
             if (replica.isLocal())
             {
@@ -478,12 +477,12 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long queryStartNanoTime)
+    private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new ProposeCallback(replicaLayout.selected().size(), replicaLayout.getRequiredParticipants(), !timeoutIfPartial, replicaLayout.consistencyLevel(), queryStartNanoTime);
+        ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !timeoutIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
-        for (Replica replica : replicaLayout.selected())
+        for (Replica replica : replicaPlan.contacts())
         {
             if (replica.isLocal())
             {
@@ -518,7 +517,7 @@ public class StorageProxy implements StorageProxyMBean
             return true;
 
         if (timeoutIfPartial && !callback.isFullyRefused())
-            throw new WriteTimeoutException(WriteType.CAS, replicaLayout.consistencyLevel(), callback.getAcceptCount(), replicaLayout.getRequiredParticipants());
+            throw new WriteTimeoutException(WriteType.CAS, replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants());
 
         return false;
     }
@@ -531,21 +530,22 @@ public class StorageProxy implements StorageProxyMBean
         Token tk = proposal.update.partitionKey().getToken();
 
         AbstractWriteResponseHandler<Commit> responseHandler = null;
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
+        // NOTE: this ReplicaPlan is a lie, this usage of ReplicaPlan could do with being clarified - the selected() collection is essentially (I think) never used
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
         if (shouldBlock)
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-            responseHandler = rs.getWriteResponseHandler(replicaLayout, null, WriteType.SIMPLE, queryStartNanoTime);
+            responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, queryStartNanoTime);
             responseHandler.setSupportsBackPressure(false);
         }
 
-        MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
-        for (Replica replica : replicaLayout.all())
+        MessageOut<Commit> message = new MessageOut<>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
+        for (Replica replica : replicaPlan.liveAndDown())
         {
             InetAddressAndPort destination = replica.endpoint();
             checkHintOverload(replica);
 
-            if (FailureDetector.instance.isAlive(destination))
+            if (replicaPlan.isAlive(replica))
             {
                 if (shouldBlock)
                 {
@@ -616,10 +616,10 @@ public class StorageProxy implements StorageProxyMBean
      * the data across to some other replica.
      *
      * @param mutations the mutations to be applied across the replicas
-     * @param consistency_level the consistency level for the operation
+     * @param consistencyLevel the consistency level for the operation
      * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
-    public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime)
+    public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -637,7 +637,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (mutation instanceof CounterMutation)
                     responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
                 else
-                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
+                    responseHandlers.add(performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
             }
 
             // upgrade to full quorum any failed cheap quorums
@@ -653,7 +653,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         catch (WriteTimeoutException|WriteFailureException ex)
         {
-            if (consistency_level == ConsistencyLevel.ANY)
+            if (consistencyLevel == ConsistencyLevel.ANY)
             {
                 hintMutations(mutations);
             }
@@ -662,7 +662,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (ex instanceof WriteFailureException)
                 {
                     writeMetrics.failures.mark();
-                    writeMetricsMap.get(consistency_level).failures.mark();
+                    writeMetricsMap.get(consistencyLevel).failures.mark();
                     WriteFailureException fe = (WriteFailureException)ex;
                     Tracing.trace("Write failure; received {} of {} required replies, failed {} requests",
                                   fe.received, fe.blockFor, fe.failureReasonByEndpoint.size());
@@ -670,7 +670,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     writeMetrics.timeouts.mark();
-                    writeMetricsMap.get(consistency_level).timeouts.mark();
+                    writeMetricsMap.get(consistencyLevel).timeouts.mark();
                     WriteTimeoutException te = (WriteTimeoutException)ex;
                     Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor);
                 }
@@ -680,14 +680,14 @@ public class StorageProxy implements StorageProxyMBean
         catch (UnavailableException e)
         {
             writeMetrics.unavailables.mark();
-            writeMetricsMap.get(consistency_level).unavailables.mark();
+            writeMetricsMap.get(consistencyLevel).unavailables.mark();
             Tracing.trace("Unavailable");
             throw e;
         }
         catch (OverloadedException e)
         {
             writeMetrics.unavailables.mark();
-            writeMetricsMap.get(consistency_level).unavailables.mark();
+            writeMetricsMap.get(consistencyLevel).unavailables.mark();
             Tracing.trace("Overloaded");
             throw e;
         }
@@ -695,7 +695,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             long latency = System.nanoTime() - startTime;
             writeMetrics.addNano(latency);
-            writeMetricsMap.get(consistency_level).addNano(latency);
+            writeMetricsMap.get(consistencyLevel).addNano(latency);
             updateCoordinatorWriteLatencyTableMetric(mutations, latency);
         }
     }
@@ -725,7 +725,8 @@ public class StorageProxy implements StorageProxyMBean
 
         // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
         // so there is no need to hint or retry.
-        EndpointsForToken replicasToHint = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token)
+        EndpointsForToken replicasToHint = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+                .all()
                 .filter(StorageProxy::shouldHint);
 
         submitHint(mutation, replicasToHint, null);
@@ -737,8 +738,8 @@ public class StorageProxy implements StorageProxyMBean
         Token token = mutation.key().getToken();
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 
-        return StorageService.instance.getNaturalReplicasForToken(keyspaceName, token).endpoints().contains(local)
-               || StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspaceName).endpoints().contains(local);
+        return ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+                .all().endpoints().contains(local);
     }
 
     /**
@@ -934,7 +935,6 @@ public class StorageProxy implements StorageProxyMBean
                                                                                cleanup,
                                                                                queryStartNanoTime);
                 // exit early if we can't fulfill the CL at this time.
-                wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
             }
 
@@ -1002,14 +1002,14 @@ public class StorageProxy implements StorageProxyMBean
     throws WriteTimeoutException, WriteFailureException
     {
         Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints);
-        WriteResponseHandler<?> handler = new WriteResponseHandler(replicaLayout,
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints);
+        WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan,
                                                                    WriteType.BATCH_LOG,
                                                                    queryStartNanoTime);
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
         MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
-        for (Replica replica : replicaLayout.all())
+        for (Replica replica : replicaPlan.liveAndDown())
         {
             logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
 
@@ -1040,12 +1040,12 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all());  // TODO: CASSANDRA-14549
-            ReplicaLayout.ForToken replicas = wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all());
+            Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown());  // TODO: CASSANDRA-14549
+            ReplicaPlan.ForTokenWrite replicas = wrapper.handler.replicaPlan.withContact(wrapper.handler.replicaPlan.liveAndDown());
 
             try
             {
-                sendToHintedReplicas(wrapper.mutation, replicas.selected(), wrapper.handler, localDataCenter, stage);
+                sendToHintedReplicas(wrapper.mutation, replicas, wrapper.handler, localDataCenter, stage);
             }
             catch (OverloadedException | WriteTimeoutException e)
             {
@@ -1059,11 +1059,11 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549
-            sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage);
+            EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown();
+            Replicas.temporaryAssertFull(sendTo); // TODO: CASSANDRA-14549
+            sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaPlan.withContact(sendTo), wrapper.handler, localDataCenter, stage);
         }
 
-
         for (WriteResponseHandlerWrapper wrapper : wrappers)
             wrapper.handler.get();
     }
@@ -1096,13 +1096,10 @@ public class StorageProxy implements StorageProxyMBean
 
         Token tk = mutation.key().getToken();
 
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk);
-        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime);
 
-        // exit early if we can't fulfill the CL at this time
-        responseHandler.assureSufficientLiveNodes();
-
-        performer.apply(mutation, replicaLayout, responseHandler, localDataCenter);
+        performer.apply(mutation, replicaPlan, responseHandler, localDataCenter);
         return responseHandler;
     }
 
@@ -1118,8 +1115,8 @@ public class StorageProxy implements StorageProxyMBean
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         Token tk = mutation.key().getToken();
 
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
-        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime);
         BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
@@ -1140,10 +1137,11 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        Token tk = mutation.key().getToken();
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, naturalEndpoints, pendingEndpoints);
 
-        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout, () -> {
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, ReplicaPlans.writeAll);
+
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
         }, writeType, queryStartNanoTime);
@@ -1208,7 +1206,7 @@ public class StorageProxy implements StorageProxyMBean
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedReplicas(final Mutation mutation,
-                                            EndpointsForToken targets,
+                                            ReplicaPlan.ForTokenWrite plan,
                                             AbstractWriteResponseHandler<IMutation> responseHandler,
                                             String localDataCenter,
                                             Stage stage)
@@ -1227,11 +1225,11 @@ public class StorageProxy implements StorageProxyMBean
 
         List<InetAddressAndPort> backPressureHosts = null;
 
-        for (Replica destination : targets)
+        for (Replica destination : plan.contacts())
         {
             checkHintOverload(destination);
 
-            if (FailureDetector.instance.isAlive(destination.endpoint()))
+            if (plan.isAlive(destination))
             {
                 if (destination.isLocal())
                 {
@@ -1251,7 +1249,7 @@ public class StorageProxy implements StorageProxyMBean
                     if (localDataCenter.equals(dc))
                     {
                         if (localDc == null)
-                            localDc = new ArrayList<>(targets.size());
+                            localDc = new ArrayList<>(plan.contacts().size());
 
                         localDc.add(destination);
                     }
@@ -1268,7 +1266,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
 
                     if (backPressureHosts == null)
-                        backPressureHosts = new ArrayList<>(targets.size());
+                        backPressureHosts = new ArrayList<>(plan.contacts().size());
 
                     backPressureHosts.add(destination.endpoint());
                 }
@@ -1345,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                   message,
                                                                   destination,
                                                                   message.getTimeout(),
-                                                                  handler.replicaLayout.consistencyLevel(),
+                                                                  handler.replicaPlan.consistencyLevel(),
                                                                   true);
             messageIds[idIdx++] = id;
             logger.trace("Adding FWD message to {}@{}", id, destination);
@@ -1435,14 +1433,13 @@ public class StorageProxy implements StorageProxyMBean
             // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
             String keyspaceName = cm.getKeyspaceName();
             Keyspace keyspace = Keyspace.open(keyspaceName);
-            AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
             Token tk = cm.key().getToken();
 
-            ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk);
-            rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes();
+            ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll)
+                    .assureSufficientReplicas();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica),
+            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica),
                                                                                                  WriteType.COUNTER, queryStartNanoTime);
 
             Tracing.trace("Enqueuing counter update to {}", replica);
@@ -1465,7 +1462,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        EndpointsForToken replicas = StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key);
+        EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(key);
 
         // CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping
         replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint()));
@@ -1511,7 +1508,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
-                                             final EndpointsForToken targets,
+                                             final ReplicaPlan.ForTokenWrite replicaPlan,
                                              final AbstractWriteResponseHandler<IMutation> responseHandler,
                                              final String localDataCenter)
     {
@@ -1524,7 +1521,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Mutation result = ((CounterMutation) mutation).applyCounterMutation();
                 responseHandler.response(null);
-                sendToHintedReplicas(result, targets, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
+                sendToHintedReplicas(result, replicaPlan, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
             }
         };
     }
@@ -1592,7 +1589,7 @@ public class StorageProxy implements StorageProxyMBean
         try
         {
             // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read
-            ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
+            ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
 
             // does the work of applying in-progress writes; throws UAE or timeout if it can't
             final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
@@ -1601,7 +1598,7 @@ public class StorageProxy implements StorageProxyMBean
 
             try
             {
-                final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, consistencyForCommitOrFetch, false, state);
+                final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaPlan, consistencyLevel, consistencyForCommitOrFetch, false, state);
                 if (pair.contentions > 0)
                     casReadMetrics.contention.update(pair.contentions);
             }
@@ -1850,21 +1847,6 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace keyspace, RingPosition pos)
-    {
-        return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken());
-    }
-
-    public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, RingPosition pos)
-    {
-        EndpointsForRange liveReplicas = StorageService.instance.getLiveNaturalReplicas(keyspace, pos);
-        // Replica availability is considered by the query path
-        Preconditions.checkState(liveReplicas.isEmpty() || liveReplicas.stream().anyMatch(Replica::isFull),
-                                 "At least one full replica required for reads: " + liveReplicas);
-
-        return DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), liveReplicas);
-    }
-
     /**
      * Estimate the number of result rows per range in the ring based on our local data.
      * <p>
@@ -1883,7 +1865,7 @@ public class StorageProxy implements StorageProxyMBean
         return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
     }
 
-    private static class RangeIterator extends AbstractIterator<ReplicaLayout.ForRange>
+    private static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
@@ -1907,43 +1889,34 @@ public class StorageProxy implements StorageProxyMBean
             return rangeCount;
         }
 
-        protected ReplicaLayout.ForRange computeNext()
+        protected ReplicaPlan.ForRangeRead computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
-            AbstractBounds<PartitionPosition> range = ranges.next();
-            EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, range.right);
-
-            int blockFor = consistency.blockFor(keyspace);
-            EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas);
-            int minResponses = Math.min(targetReplicas.size(), blockFor);
-
-            // Endpoints for range here as well
-            return ReplicaLayout.forRangeRead(keyspace, consistency, range,
-                                              liveReplicas, targetReplicas.subList(0, minResponses));
+            return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next());
         }
     }
 
-    private static class RangeMerger extends AbstractIterator<ReplicaLayout.ForRange>
+    private static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
-        private final PeekingIterator<ReplicaLayout.ForRange> ranges;
+        private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
 
-        private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+        private RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
         {
             this.keyspace = keyspace;
             this.consistency = consistency;
             this.ranges = Iterators.peekingIterator(iterator);
         }
 
-        protected ReplicaLayout.ForRange computeNext()
+        protected ReplicaPlan.ForRangeRead computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
-            ReplicaLayout.ForRange current = ranges.next();
+            ReplicaPlan.ForRangeRead current = ranges.next();
 
             // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
             // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
@@ -1955,25 +1928,15 @@ public class StorageProxy implements StorageProxyMBean
                 // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
                 // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
                 // wire compatibility, so It's likely easier not to bother;
-                if (current.range.right.isMinimum())
+                if (current.range().right.isMinimum())
                     break;
 
-                ReplicaLayout.ForRange next = ranges.peek();
-
-                EndpointsForRange merged = current.all().keep(next.all().endpoints());
-
-                // Check if there is enough endpoint for the merge to be possible.
-                if (!consistency.isSufficientLiveNodesForRead(keyspace, merged))
-                    break;
-
-                EndpointsForRange filteredMerged = consistency.filterForQuery(keyspace, merged);
-
-                // Estimate whether merging will be a win or not
-                if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.selected(), next.selected()))
+                ReplicaPlan.ForRangeRead next = ranges.peek();
+                ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
+                if (merged == null)
                     break;
 
-                // If we get there, merge this range and the next one
-                current = ReplicaLayout.forRangeRead(keyspace, consistency, current.range.withNewRight(next.range.right), merged, filteredMerged);
+                current = merged;
                 ranges.next(); // consume the range we just merged since we've only peeked so far
             }
             return current;
@@ -2018,7 +1981,7 @@ public class StorageProxy implements StorageProxyMBean
 
     private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
     {
-        private final Iterator<ReplicaLayout.ForRange> ranges;
+        private final Iterator<ReplicaPlan.ForRangeRead> ranges;
         private final int totalRangeCount;
         private final PartitionRangeReadCommand command;
         private final boolean enforceStrictLiveness;
@@ -2108,42 +2071,40 @@ public class StorageProxy implements StorageProxyMBean
         /**
          * Queries the provided sub-range.
          *
-         * @param replicaLayout the subRange to query.
+         * @param replicaPlan the subRange to query.
          * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
          * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
          * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
          * that it's the query that "continues" whatever we're previously queried).
          */
-        private SingleRangeResponse query(ReplicaLayout.ForRange replicaLayout, boolean isFirst)
+        private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
         {
-            PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaLayout.range, isFirst);
-            ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
-            DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime);
-            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-
-            ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
-                                                                                                 replicaLayout.consistencyLevel().blockFor(keyspace),
-                                                                                                 rangeCommand,
-                                                                                                 replicaLayout,
-                                                                                                 queryStartNanoTime);
+            PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
+            ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
+            ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
+                    = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
+            DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
+                    = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+            ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
+                    = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
 
-            handler.assureSufficientLiveNodes();
+            replicaPlan.assureSufficientReplicas();
 
             // If enabled, request repaired data tracking info from full replicas but
             // only if there are multiple full replicas to compare results from
             if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
-                && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+                    && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
             {
                 command.trackRepairedStatus();
             }
 
-            if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
+            if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal())
             {
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
             }
             else
             {
-                for (Replica replica : replicaLayout.selected())
+                for (Replica replica : replicaPlan.contacts())
                 {
                     Tracing.trace("Enqueuing request to {}", replica);
                     PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery();
@@ -2486,7 +2447,7 @@ public class StorageProxy implements StorageProxyMBean
     public interface WritePerformer
     {
         public void apply(IMutation mutation,
-                          ReplicaLayout.ForToken targets,
+                          ReplicaPlan.ForTokenWrite targets,
                           AbstractWriteResponseHandler<IMutation> responseHandler,
                           String localDataCenter) throws OverloadedException;
     }
@@ -2499,7 +2460,7 @@ public class StorageProxy implements StorageProxyMBean
         public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
         {
             super(writeHandler, i, cleanup, queryStartNanoTime);
-            viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+            viewWriteMetrics.viewReplicasAttempted.inc(candidateReplicaCount());
         }
 
         public void response(MessageIn<IMutation> msg)
@@ -2705,7 +2666,7 @@ public class StorageProxy implements StorageProxyMBean
                 HintsService.instance.write(hostIds, Hint.create(mutation, System.currentTimeMillis()));
                 validTargets.forEach(HintsService.instance.metrics::incrCreatedHints);
                 // Notify the handler only for CL == ANY
-                if (responseHandler != null && responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY)
+                if (responseHandler != null && responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY)
                     responseHandler.response(null);
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f4ae14..a979f1c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -172,7 +172,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public RangesAtEndpoint getLocalReplicas(String keyspaceName)
     {
-        return getReplicasForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
+        return Keyspace.open(keyspaceName).getReplicationStrategy()
+                .getAddressReplicas(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public List<Range<Token>> getLocalAndPendingRanges(String ks)
@@ -2015,11 +2016,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     */
     private EndpointsByRange constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
     {
+        AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
         Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new HashMap<>(ranges.size());
         for (Range<Token> range : ranges)
-        {
-            rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right));
-        }
+            rangeToEndpointMap.put(range, strategy.getNaturalReplicas(range.right));
         return new EndpointsByRange(rangeToEndpointMap);
     }
 
@@ -3878,16 +3878,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Get all ranges an endpoint is responsible for (by keyspace)
-     * @param ep endpoint we are interested in.
-     * @return ranges for the specified endpoint.
-     */
-    RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, InetAddressAndPort ep)
-    {
-        return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep);
-    }
-
-    /**
      * Get all ranges that span the ring given a set
      * of tokens. All ranges are in sorted order of
      * ranges.
@@ -3936,11 +3926,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, key), true);
     }
 
-
     @Deprecated
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key)
     {
-        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key));
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
         List<InetAddress> inetList = new ArrayList<>(replicas.size());
         replicas.forEach(r -> inetList.add(r.endpoint().address));
         return inetList;
@@ -3948,7 +3937,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key)
     {
-        return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)), true);
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
+        return Replicas.stringify(replicas, true);
     }
 
     public List<String> getReplicas(String keyspaceName, String cf, String key)
@@ -3971,61 +3961,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (metadata == null)
             throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
 
-        return getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
-    }
-
-    /**
-     * This method returns the N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspaceName keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     * @return the endpoint responsible for this token
-     */
-    public static EndpointsForToken getNaturalReplicasForToken(String keyspaceName, RingPosition pos)
-    {
-        return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos);
+        return getNaturalReplicasForToken(keyspaceName, metadata.partitionKeyType.fromString(key));
     }
 
-    /**
-     * Returns the endpoints currently responsible for storing the token plus pending ones
-     */
-    public EndpointsForToken getNaturalAndPendingReplicasForToken(String keyspaceName, Token token)
-    {
-        // TODO: race condition to fetch these. impliciations??
-        EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, token);
-        EndpointsForToken pending = tokenMetadata.pendingEndpointsForToken(token, keyspaceName);
-        if (Endpoints.haveConflicts(natural, pending))
-        {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
-        }
-        return Endpoints.concat(natural, pending);
-    }
-
-    /**
-     * This method attempts to return N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspace keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     */
-    public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, RingPosition pos)
-    {
-        return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken());
-    }
-
-    /**
-     * This method attempts to return N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspace keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     */
-    public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, RingPosition pos)
+    public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key)
     {
-        EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(pos);
-        return replicas.filter(r -> FailureDetector.instance.isAlive(r.endpoint()));
+        Token token = tokenMetadata.partitioner.getToken(key);
+        return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token);
     }
 
     public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception
@@ -4268,7 +4210,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                              .filter(endpoint -> FailureDetector.instance.isAlive(endpoint) && !FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
                              .collect(Collectors.toList());
 
-        return EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints));
+        return SystemReplicas.getSystemReplicas(endpoints);
     }
     /**
      * Find the best target to stream hints to. Currently the closest peer according to the snitch

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index a07aae6..f9bfedf 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.service;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,18 +37,18 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
     private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater
             = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses");
 
-    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                 Runnable callback,
                                 WriteType writeType,
                                 long queryStartNanoTime)
     {
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        responses = totalBlockFor();
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        responses = blockFor();
     }
 
-    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, WriteType writeType, long queryStartNanoTime)
+    public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType writeType, long queryStartNanoTime)
     {
-        this(replicaLayout, null, writeType, queryStartNanoTime);
+        this(replicaPlan, null, writeType, queryStartNanoTime);
     }
 
     public void response(MessageIn<T> m)
@@ -65,7 +63,7 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
 
     protected int ackCount()
     {
-        return totalBlockFor() - responses;
+        return blockFor() - responses;
     }
 
     public boolean isLatencyForSnitch()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[4/4] cassandra git commit: ReplicaPlan/Layout refactor follow-up/completion

Posted by be...@apache.org.
ReplicaPlan/Layout refactor follow-up/completion

Finish much of the work to clarify endpoint selection
that was begun in Transient Replication (CASSANDRA-14404)

Also fixes:
  - commitPaxos was incorrectly selecting only live nodes,
    when needed to include down
  - We were not writing to pending transient replicas
  - On write, we were not hinting to full nodes with transient
    replication
  - rr.maybeSendAdditional{Reads,Writes} would only consult the
    same node we may have speculated a read to
  - transient->full movements mishandled consistency level upgrade by
    retaining the 'full' pending variant, which increased CL requirement;
    instead, the 'natural' replica is upgraded to 'full' for writes

patch by Benedict; reviewed by Alex Petrov and Ariel Weisberg for CASSANDRA-14705


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/047bcd7a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/047bcd7a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/047bcd7a

Branch: refs/heads/trunk
Commit: 047bcd7ad171d6a4aa89128c5e6c6ed5f012b1c0
Parents: 05dbb3e
Author: Benedict Elliott Smith <be...@apple.com>
Authored: Fri Sep 7 11:41:28 2018 +0100
Committer: Benedict Elliott Smith <be...@apple.com>
Committed: Fri Sep 14 10:14:37 2018 +0100

----------------------------------------------------------------------
 .../cassandra/batchlog/BatchlogManager.java     |  64 ++-
 .../apache/cassandra/db/ConsistencyLevel.java   |  26 +-
 .../apache/cassandra/gms/FailureDetector.java   |   4 +
 .../apache/cassandra/hints/HintsService.java    |   4 +-
 .../locator/AbstractReplicaCollection.java      |  67 +--
 .../locator/AbstractReplicationStrategy.java    |  20 +-
 .../org/apache/cassandra/locator/Endpoints.java |  59 +--
 .../cassandra/locator/EndpointsForRange.java    |   2 +-
 .../cassandra/locator/EndpointsForToken.java    |   2 +-
 .../cassandra/locator/RangesAtEndpoint.java     |   2 +-
 .../cassandra/locator/ReplicaCollection.java    |  27 +-
 .../apache/cassandra/locator/ReplicaLayout.java | 435 ++++++++-----------
 .../apache/cassandra/locator/ReplicaPlan.java   | 240 ++++++++++
 .../apache/cassandra/locator/ReplicaPlans.java  | 295 +++++++++++++
 .../cassandra/locator/SystemReplicas.java       |  12 +-
 .../apache/cassandra/locator/TokenMetadata.java |   8 +-
 .../apache/cassandra/net/IAsyncCallback.java    |  10 -
 .../service/AbstractWriteResponseHandler.java   |  50 +--
 .../cassandra/service/ActiveRepairService.java  |   2 +-
 .../service/BatchlogResponseHandler.java        |  15 +-
 .../DatacenterSyncWriteResponseHandler.java     |  12 +-
 .../service/DatacenterWriteResponseHandler.java |  10 +-
 .../apache/cassandra/service/StorageProxy.java  | 239 +++++-----
 .../cassandra/service/StorageService.java       |  82 +---
 .../cassandra/service/WriteResponseHandler.java |  16 +-
 .../service/reads/AbstractReadExecutor.java     |  78 ++--
 .../cassandra/service/reads/DataResolver.java   |  26 +-
 .../cassandra/service/reads/DigestResolver.java |  22 +-
 .../cassandra/service/reads/ReadCallback.java   |  62 ++-
 .../service/reads/ResponseResolver.java         |  22 +-
 .../reads/ShortReadPartitionsProtection.java    |  19 +-
 .../reads/repair/AbstractReadRepair.java        |  42 +-
 .../reads/repair/BlockingPartitionRepair.java   |  29 +-
 .../reads/repair/BlockingReadRepair.java        |  25 +-
 .../service/reads/repair/NoopReadRepair.java    |  11 +-
 .../repair/PartitionIteratorMergeListener.java  |  16 +-
 .../reads/repair/ReadOnlyReadRepair.java        |  13 +-
 .../service/reads/repair/ReadRepair.java        |  17 +-
 .../reads/repair/ReadRepairDiagnostics.java     |  11 +-
 .../service/reads/repair/ReadRepairEvent.java   |   2 +-
 .../reads/repair/ReadRepairStrategy.java        |  11 +-
 .../reads/repair/RowIteratorMergeListener.java  |  21 +-
 .../locator/ReplicaCollectionTest.java          |  89 ++--
 .../service/WriteResponseHandlerTest.java       |   4 +-
 .../WriteResponseHandlerTransientTest.java      |  71 ++-
 .../service/reads/DataResolverTest.java         |  21 +-
 .../reads/DataResolverTransientTest.java        | 227 ++++++++++
 .../service/reads/DigestResolverTest.java       |   5 +-
 .../service/reads/ReadExecutorTest.java         |   7 +-
 .../reads/repair/AbstractReadRepairTest.java    |  32 +-
 .../reads/repair/BlockingReadRepairTest.java    |  36 +-
 .../DiagEventsBlockingReadRepairTest.java       |  26 +-
 .../reads/repair/InstrumentedReadRepair.java    |   4 +-
 .../reads/repair/ReadOnlyReadRepairTest.java    |  24 +-
 .../service/reads/repair/ReadRepairTest.java    |  12 +-
 .../reads/repair/TestableReadRepair.java        |  18 +-
 56 files changed, 1655 insertions(+), 1051 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 8dda54e..77f725c 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -26,12 +26,9 @@ import java.util.concurrent.*;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.RateLimiter;
 
-import org.apache.cassandra.locator.ReplicaLayout;
-import org.apache.cassandra.locator.EndpointsForToken;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +51,8 @@ import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
 import org.apache.cassandra.io.util.DataInputBuffer;
 import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -459,36 +458,34 @@ public class BatchlogManager implements BatchlogManagerMBean
             Keyspace keyspace = Keyspace.open(ks);
             Token tk = mutation.key().getToken();
 
-            EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(ks, tk);
-            Replicas.temporaryAssertFull(replicas); // TODO in CASSANDRA-14549
+            // TODO: this logic could do with revisiting at some point, as it is unclear what its rationale is
+            // we perform a local write, ignoring errors and inline in this thread (potentially slowing replay down)
+            // effectively bumping CL for locally owned writes and also potentially stalling log replay if an error occurs
+            // once we decide how it should work, it can also probably be simplified, and avoid constructing a ReplicaPlan directly
+            ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+            Replicas.temporaryAssertFull(liveAndDown.all()); // TODO in CASSANDRA-14549
+
+            Replica selfReplica = liveAndDown.all().selfIfPresent();
+            if (selfReplica != null)
+                mutation.apply();
 
-            EndpointsForToken.Builder liveReplicasBuilder = EndpointsForToken.builder(tk);
-            for (Replica replica : replicas)
+            ReplicaLayout.ForTokenWrite liveRemoteOnly = liveAndDown.filter(
+                    r -> FailureDetector.isReplicaAlive.test(r) && r != selfReplica);
+
+            for (Replica replica : liveAndDown.all())
             {
-                if (replica.isLocal())
-                {
-                    mutation.apply();
-                }
-                else if (FailureDetector.instance.isAlive(replica.endpoint()))
-                {
-                    liveReplicasBuilder.add(replica); // will try delivering directly instead of writing a hint.
-                }
-                else
-                {
-                    hintedNodes.add(replica.endpoint());
-                    HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
-                                                Hint.create(mutation, writtenAt));
-                }
+                if (replica == selfReplica || liveRemoteOnly.all().contains(replica))
+                    continue;
+                hintedNodes.add(replica.endpoint());
+                HintsService.instance.write(StorageService.instance.getHostIdForEndpoint(replica.endpoint()),
+                        Hint.create(mutation, writtenAt));
             }
 
-            EndpointsForToken liveReplicas = liveReplicasBuilder.build();
-            if (liveReplicas.isEmpty())
-                return null;
-
-            Replicas.temporaryAssertFull(liveReplicas);
-            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(keyspace, liveReplicas, System.nanoTime());
+            ReplicaPlan.ForTokenWrite replicaPlan = new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE,
+                    liveRemoteOnly.pending(), liveRemoteOnly.all(), liveRemoteOnly.all(), liveRemoteOnly.all());
+            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(replicaPlan, System.nanoTime());
             MessageOut<Mutation> message = mutation.createMessage();
-            for (Replica replica : liveReplicas)
+            for (Replica replica : liveRemoteOnly.all())
                 MessagingService.instance().sendWriteRR(message, replica, handler, false);
             return handler;
         }
@@ -509,17 +506,16 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             private final Set<InetAddressAndPort> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-            ReplayWriteResponseHandler(Keyspace keyspace, EndpointsForToken writeReplicas, long queryStartNanoTime)
+            ReplayWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, long queryStartNanoTime)
             {
-                super(ReplicaLayout.forWriteWithDownNodes(keyspace, null, writeReplicas.token(), writeReplicas, EndpointsForToken.empty(writeReplicas.token())),
-                      null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
-                Iterables.addAll(undelivered, writeReplicas.endpoints());
+                super(replicaPlan, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
+                Iterables.addAll(undelivered, replicaPlan.contacts().endpoints());
             }
 
             @Override
-            protected int totalBlockFor()
+            protected int blockFor()
             {
-                return this.replicaLayout.selected().size();
+                return this.replicaPlan.contacts().size();
             }
 
             @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ConsistencyLevel.java b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
index 35ba198..5a4baf7 100644
--- a/src/java/org/apache/cassandra/db/ConsistencyLevel.java
+++ b/src/java/org/apache/cassandra/db/ConsistencyLevel.java
@@ -251,16 +251,10 @@ public enum ConsistencyLevel
         if (this == EACH_QUORUM && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
             return filterForEachQuorum(keyspace, liveReplicas);
 
-        /*
-         * Endpoints are expected to be restricted to live replicas, sorted by snitch preference.
-         * For LOCAL_QUORUM, move local-DC replicas in front first as we need them there whether
-         * we do read repair (since the first replica gets the data read) or not (since we'll take
-         * the blockFor first ones).
-         */
-        if (isDCLocal)
-            liveReplicas = liveReplicas.sorted(DatabaseDescriptor.getLocalComparator());
-
-        return liveReplicas.subList(0, Math.min(liveReplicas.size(), blockFor(keyspace) + (alwaysSpeculate ? 1 : 0)));
+        int count = blockFor(keyspace) + (alwaysSpeculate ? 1 : 0);
+        return isDCLocal
+                ? liveReplicas.filter(ConsistencyLevel::isLocal, count)
+                : liveReplicas.subList(0, Math.min(liveReplicas.size(), count));
     }
 
     private <E extends Endpoints<E>> E filterForEachQuorum(Keyspace keyspace, E liveReplicas)
@@ -285,7 +279,7 @@ public enum ConsistencyLevel
         });
     }
 
-    public boolean isSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
+    public boolean isSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas)
     {
         switch (this)
         {
@@ -316,15 +310,15 @@ public enum ConsistencyLevel
         }
     }
 
-    public void assureSufficientLiveNodesForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
+    public void assureSufficientLiveReplicasForRead(Keyspace keyspace, Endpoints<?> liveReplicas) throws UnavailableException
     {
-        assureSufficientLiveNodes(keyspace, liveReplicas, blockFor(keyspace), 1);
+        assureSufficientLiveReplicas(keyspace, liveReplicas, blockFor(keyspace), 1);
     }
-    public void assureSufficientLiveNodesForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
+    public void assureSufficientLiveReplicasForWrite(Keyspace keyspace, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException
     {
-        assureSufficientLiveNodes(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
+        assureSufficientLiveReplicas(keyspace, allLive, blockForWrite(keyspace, pendingWithDown), 0);
     }
-    public void assureSufficientLiveNodes(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
+    void assureSufficientLiveReplicas(Keyspace keyspace, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException
     {
         switch (this)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/gms/FailureDetector.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/FailureDetector.java b/src/java/org/apache/cassandra/gms/FailureDetector.java
index e567b7b..d7f73ab 100644
--- a/src/java/org/apache/cassandra/gms/FailureDetector.java
+++ b/src/java/org/apache/cassandra/gms/FailureDetector.java
@@ -27,11 +27,13 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 import javax.management.openmbean.CompositeData;
 import javax.management.openmbean.*;
 
+import org.apache.cassandra.locator.Replica;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +73,8 @@ public class FailureDetector implements IFailureDetector, FailureDetectorMBean
     }
 
     public static final IFailureDetector instance = new FailureDetector();
+    public static final Predicate<InetAddressAndPort> isEndpointAlive = instance::isAlive;
+    public static final Predicate<Replica> isReplicaAlive = r -> isEndpointAlive.test(r.endpoint());
 
     // this is useless except to provide backwards compatibility in phi_convict_threshold,
     // because everyone seems pretty accustomed to the default of 8, and users who have

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/hints/HintsService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/hints/HintsService.java b/src/java/org/apache/cassandra/hints/HintsService.java
index c6ad3d9..73840d3 100644
--- a/src/java/org/apache/cassandra/hints/HintsService.java
+++ b/src/java/org/apache/cassandra/hints/HintsService.java
@@ -34,6 +34,8 @@ import javax.management.ObjectName;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.locator.ReplicaLayout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -185,7 +187,7 @@ public final class HintsService implements HintsServiceMBean
         String keyspaceName = hint.mutation.getKeyspaceName();
         Token token = hint.mutation.key().getToken();
 
-        EndpointsForToken replicas = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token);
+        EndpointsForToken replicas = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token).all();
 
         // judicious use of streams: eagerly materializing probably cheaper
         // than performing filters / translations 2x extra via Iterables.filter/transform

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
index 6a7a4ff..94ff991 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java
@@ -75,7 +75,6 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
      */
     public abstract Mutable<C> newMutable(int initialCapacity);
 
-
     public C snapshot()
     {
         return isSnapshot ? self()
@@ -83,6 +82,7 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
                                                     : new ArrayList<>(list));
     }
 
+    /** see {@link ReplicaCollection#subList(int, int)}*/
     public final C subList(int start, int end)
     {
         List<Replica> subList;
@@ -100,11 +100,23 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
         return snapshot(subList);
     }
 
+    /** see {@link ReplicaCollection#count(Predicate)}*/
+    public int count(Predicate<Replica> predicate)
+    {
+        int count = 0;
+        for (int i = 0 ; i < list.size() ; ++i)
+            if (predicate.test(list.get(i)))
+                ++count;
+        return count;
+    }
+
+    /** see {@link ReplicaCollection#filter(Predicate)}*/
     public final C filter(Predicate<Replica> predicate)
     {
         return filter(predicate, Integer.MAX_VALUE);
     }
 
+    /** see {@link ReplicaCollection#filter(Predicate, int)}*/
     public final C filter(Predicate<Replica> predicate, int limit)
     {
         if (isEmpty())
@@ -148,53 +160,8 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
         return snapshot(copy);
     }
 
-    public final class Select
-    {
-        private final List<Replica> result;
-        public Select(int expectedSize)
-        {
-            this.result = new ArrayList<>(expectedSize);
-        }
-
-        /**
-         * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates.
-         * Stop once we have targetSize replicas in total, including preceding calls
-         */
-        public Select add(Predicate<Replica> predicate, int targetSize)
-        {
-            assert !Iterables.any(result, predicate::test);
-            for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i)
-                if (predicate.test(list.get(i)))
-                    result.add(list.get(i));
-            return this;
-        }
-        public Select add(Predicate<Replica> predicate)
-        {
-            return add(predicate, Integer.MAX_VALUE);
-        }
-        public C get()
-        {
-            return snapshot(result);
-        }
-    }
-
-    /**
-     * An efficient method for selecting a subset of replica via a sequence of filters.
-     *
-     * Example: select().add(filter1).add(filter2, 3).get();
-     *
-     * @return a Select object
-     */
-    public final Select select()
-    {
-        return select(list.size());
-    }
-    public final Select select(int expectedSize)
-    {
-        return new Select(expectedSize);
-    }
-
-    public final C sorted(Comparator<Replica> comparator)
+    /** see {@link ReplicaCollection#sorted(Comparator)}*/
+   public final C sorted(Comparator<Replica> comparator)
     {
         List<Replica> copy = new ArrayList<>(list);
         copy.sort(comparator);
@@ -267,9 +234,9 @@ public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollect
         if (replicas.isEmpty())
             return extraReplicas;
         Mutable<C> mutable = replicas.newMutable(replicas.size() + extraReplicas.size());
-        mutable.addAll(replicas);
+        mutable.addAll(replicas, Mutable.Conflict.NONE);
         mutable.addAll(extraReplicas, ignoreConflicts);
-        return mutable.asImmutableView();
+        return mutable.asSnapshot();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index d168052..bad736f 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -141,33 +141,33 @@ public abstract class AbstractReplicationStrategy
      */
     public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata);
 
-    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                                                        Runnable callback,
                                                                        WriteType writeType,
                                                                        long queryStartNanoTime)
     {
-        return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
+        return getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel());
     }
 
-    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                                                        Runnable callback,
                                                                        WriteType writeType,
                                                                        long queryStartNanoTime,
                                                                        ConsistencyLevel idealConsistencyLevel)
     {
         AbstractWriteResponseHandler resultResponseHandler;
-        if (replicaLayout.consistencyLevel.isDatacenterLocal())
+        if (replicaPlan.consistencyLevel().isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
-            resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+            resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
         }
-        else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
+        else if (replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
         {
-            resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+            resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
         }
         else
         {
-            resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime);
+            resultResponseHandler = new WriteResponseHandler<T>(replicaPlan, callback, writeType, queryStartNanoTime);
         }
 
         //Check if tracking the ideal consistency level is configured
@@ -176,14 +176,14 @@ public abstract class AbstractReplicationStrategy
             //If ideal and requested are the same just use this handler to track the ideal consistency level
             //This is also used so that the ideal consistency level handler when constructed knows it is the ideal
             //one for tracking purposes
-            if (idealConsistencyLevel == replicaLayout.consistencyLevel)
+            if (idealConsistencyLevel == replicaPlan.consistencyLevel())
             {
                 resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler);
             }
             else
             {
                 //Construct a delegate response handler to use to track the ideal consistency level
-                AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel),
+                AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaPlan.withConsistencyLevel(idealConsistencyLevel),
                                                                                     callback,
                                                                                     writeType,
                                                                                     queryStartNanoTime,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/Endpoints.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java
index 3d5faa4..28e578c 100644
--- a/src/java/org/apache/cassandra/locator/Endpoints.java
+++ b/src/java/org/apache/cassandra/locator/Endpoints.java
@@ -29,6 +29,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+/**
+ * A collection of Endpoints for a given ring position.  This will typically reside in a ReplicaLayout,
+ * representing some subset of the endpoints for the Token or Range
+ * @param <E> The concrete type of Endpoints, that will be returned by the modifying methods
+ */
 public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaCollection<E>
 {
     static final Map<InetAddressAndPort, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>());
@@ -89,17 +94,32 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
         return filter(r -> !self.equals(r.endpoint()));
     }
 
+    public Replica selfIfPresent()
+    {
+        InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort();
+        return byEndpoint().get(self);
+    }
+
+    /**
+     * @return a collection without the provided endpoints, otherwise in the same order as this collection
+     */
     public E without(Set<InetAddressAndPort> remove)
     {
         return filter(r -> !remove.contains(r.endpoint()));
     }
 
+    /**
+     * @return a collection with only the provided endpoints (ignoring any not present), otherwise in the same order as this collection
+     */
     public E keep(Set<InetAddressAndPort> keep)
     {
         return filter(r -> keep.contains(r.endpoint()));
     }
 
-    public E keep(Iterable<InetAddressAndPort> endpoints)
+    /**
+     * @return a collection containing the Replica from this collection for the provided endpoints, in the order of the provided endpoints
+     */
+    public E select(Iterable<InetAddressAndPort> endpoints, boolean ignoreMissing)
     {
         ReplicaCollection.Mutable<E> copy = newMutable(
                 endpoints instanceof Collection<?>
@@ -109,10 +129,14 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
         Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint();
         for (InetAddressAndPort endpoint : endpoints)
         {
-            Replica keep = byEndpoint.get(endpoint);
-            if (keep == null)
+            Replica select = byEndpoint.get(endpoint);
+            if (select == null)
+            {
+                if (!ignoreMissing)
+                    throw new IllegalArgumentException(endpoint + " is not present in " + this);
                 continue;
-            copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE);
+            }
+            copy.add(select, ReplicaCollection.Mutable.Conflict.DUPLICATE);
         }
         return copy.asSnapshot();
     }
@@ -124,34 +148,19 @@ public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaC
      *   2) because a movement that changes the type of replication from transient to full must be handled
      *      differently for reads and writes (with the reader treating it as transient, and writer as full)
      *
-     * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues
+     * The method {@link ReplicaLayout#haveWriteConflicts} can be used to detect and resolve any issues
      */
     public static <E extends Endpoints<E>> E concat(E natural, E pending)
     {
         return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE);
     }
 
-    public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E pending)
-    {
-        Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
-        for (InetAddressAndPort pendingEndpoint : pending.endpoints())
-        {
-            if (naturalEndpoints.contains(pendingEndpoint))
-                return true;
-        }
-        return false;
-    }
-
-    // must apply first
-    public static <E extends Endpoints<E>> E resolveConflictsInNatural(E natural, E pending)
-    {
-        return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
-    }
-
-    // must apply second
-    public static <E extends Endpoints<E>> E resolveConflictsInPending(E natural, E pending)
+    public static <E extends Endpoints<E>> E append(E replicas, Replica extraReplica)
     {
-        return pending.without(natural.endpoints());
+        Mutable<E> mutable = replicas.newMutable(replicas.size() + 1);
+        mutable.addAll(replicas);
+        mutable.add(extraReplica, Conflict.NONE);
+        return mutable.asSnapshot();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
index c2d8232..f812951 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForRange.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java
@@ -86,7 +86,7 @@ public class EndpointsForRange extends Endpoints<EndpointsForRange>
     {
         boolean hasSnapshot;
         public Mutable(Range<Token> range) { this(range, 0); }
-        public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+        public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
 
         public void add(Replica replica, Conflict ignoreConflict)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
index f24c615..3446dc9 100644
--- a/src/java/org/apache/cassandra/locator/EndpointsForToken.java
+++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java
@@ -77,7 +77,7 @@ public class EndpointsForToken extends Endpoints<EndpointsForToken>
     {
         boolean hasSnapshot;
         public Mutable(Token token) { this(token, 0); }
-        public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+        public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
 
         public void add(Replica replica, Conflict ignoreConflict)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
index 74828ad..1773173 100644
--- a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -173,7 +173,7 @@ public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint
     {
         boolean hasSnapshot;
         public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); }
-        public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+        public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>(capacity)); }
 
         public void add(Replica replica, Conflict ignoreConflict)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
index 6833f4b..d1006dc 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaCollection.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -62,6 +62,11 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
     public abstract boolean contains(Replica replica);
 
     /**
+     * @return the number of replicas that match the predicate
+     */
+    public abstract int count(Predicate<Replica> predicate);
+
+    /**
      * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate.
      * An effort will be made to either return ourself, or a subList, where possible.
      * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
@@ -108,16 +113,30 @@ public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Itera
         C asImmutableView();
 
         /**
-         * @return an Immutable clone that assumes this Mutable will never be modified again.
-         * If this is not true, behaviour is undefined.
+         * @return an Immutable clone that assumes this Mutable will never be modified again,
+         * so its contents can be reused.
+         *
+         * This Mutable should enforce that it is no longer modified.
          */
         C asSnapshot();
 
-        enum Conflict { NONE, DUPLICATE, ALL}
+        /**
+         * Passed to add() and addAll() as ignoreConflicts parameter. The meaning of conflict varies by collection type
+         * (for Endpoints, it is a duplicate InetAddressAndPort; for RangesAtEndpoint it is a duplicate Range).
+         */
+        enum Conflict
+        {
+            /** fail on addition of any such conflict */
+            NONE,
+            /** fail on addition of any such conflict where the contents differ (first occurrence and position wins) */
+            DUPLICATE,
+            /** ignore all conflicts (the first occurrence and position wins) */
+            ALL
+        }
 
         /**
          * @param replica add this replica to the end of the collection
-         * @param ignoreConflict if false, fail on any conflicting additions (as defined by C's semantics)
+         * @param ignoreConflict conflicts to ignore, see {@link Conflict}
          */
         void add(Replica replica, Conflict ignoreConflict);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
index 946a7f8..f48c989 100644
--- a/src/java/org/apache/cassandra/locator/ReplicaLayout.java
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -18,364 +18,299 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Predicate;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Predicates;
-import com.google.common.collect.Iterables;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.net.IAsyncCallback;
-import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
-import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 import org.apache.cassandra.utils.FBUtilities;
 
-import static com.google.common.collect.Iterables.any;
+import java.util.Set;
+import java.util.function.Predicate;
 
 /**
- * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors
- * for building the relevant layout.
+ * The relevant replicas for an operation over a given range or token.
  *
- * Constitutes:
- *  - the 'natural' replicas replicating the range or token relevant for the operation
- *  - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates
- *  - the 'selected' replicas, those that should be targeted for any operation
- *  - 'all' replicas represents natural+pending
- *
- * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange)
- * @param <L> the type of itself, including its type parameters, for return type of modifying methods
+ * @param <E>
  */
-public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public abstract class ReplicaLayout<E extends Endpoints<E>>
 {
-    private volatile E all;
-    protected final E natural;
-    protected final E pending;
-    protected final E selected;
-
-    protected final Keyspace keyspace;
-    protected final ConsistencyLevel consistencyLevel;
-
-    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected)
-    {
-        this(keyspace, consistencyLevel, natural, pending, selected, null);
-    }
+    private final E natural;
 
-    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all)
+    ReplicaLayout(E natural)
     {
-        assert selected != null;
-        assert pending == null || !Endpoints.haveConflicts(natural, pending);
-        this.keyspace = keyspace;
-        this.consistencyLevel = consistencyLevel;
         this.natural = natural;
-        this.pending = pending;
-        this.selected = selected;
-        // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural
-        if (all == null && pending == null)
-            all = natural;
-        this.all = all;
     }
 
-    public Replica getReplicaFor(InetAddressAndPort endpoint)
-    {
-        return natural.byEndpoint().get(endpoint);
-    }
-
-    public E natural()
+    /**
+     * The 'natural' owners of the ring position(s), as implied by the current ring layout.
+     * This excludes any pending owners, i.e. those that are in the process of taking ownership of a range, but
+     * have not yet finished obtaining their view of the range.
+     */
+    public final E natural()
     {
         return natural;
     }
 
-    public E all()
-    {
-        E result = all;
-        if (result == null)
-            all = result = Endpoints.concat(natural, pending);
-        return result;
-    }
-
-    public E selected()
-    {
-        return selected;
-    }
-
     /**
-     * @return the pending replicas - will be null for read layouts
-     * TODO: ideally we would enforce at compile time that read layouts have no pending to access
+     * All relevant owners of the ring position(s) for this operation, as implied by the current ring layout.
+     * For writes, this will include pending owners, and for reads it will be equivalent to natural()
      */
-    public E pending()
-    {
-        return pending;
-    }
-
-    public int blockFor()
+    public E all()
     {
-        return pending == null
-                ? consistencyLevel.blockFor(keyspace)
-                : consistencyLevel.blockForWrite(keyspace, pending);
+        return natural;
     }
 
-    public Keyspace keyspace()
+    public String toString()
     {
-        return keyspace;
+        return "ReplicaLayout [ natural: " + natural + " ]";
     }
 
-    public ConsistencyLevel consistencyLevel()
+    public static class ForTokenRead extends ReplicaLayout<EndpointsForToken> implements ForToken
     {
-        return consistencyLevel;
-    }
-
-    abstract public L withSelected(E replicas);
-
-    abstract public L withConsistencyLevel(ConsistencyLevel cl);
-
-    public L forNaturalUncontacted()
-    {
-        E more;
-        if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+        public ForTokenRead(EndpointsForToken natural)
         {
-            IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
-            String localDC = DatabaseDescriptor.getLocalDataCenter();
+            super(natural);
+        }
 
-            more = natural.filter(replica -> !selected.contains(replica) &&
-                    snitch.getDatacenter(replica).equals(localDC));
-        } else
+        @Override
+        public Token token()
         {
-            more = natural.filter(replica -> !selected.contains(replica));
+            return natural().token();
         }
 
-        return withSelected(more);
+        public ReplicaLayout.ForTokenRead filter(Predicate<Replica> filter)
+        {
+            EndpointsForToken filtered = natural().filter(filter);
+            // AbstractReplicaCollection.filter returns itself if all elements match the filter
+            if (filtered == natural()) return this;
+            return new ReplicaLayout.ForTokenRead(filtered);
+        }
     }
 
-    public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange>
+    public static class ForRangeRead extends ReplicaLayout<EndpointsForRange> implements ForRange
     {
-        public final AbstractBounds<PartitionPosition> range;
+        final AbstractBounds<PartitionPosition> range;
 
-        @VisibleForTesting
-        public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+        public ForRangeRead(AbstractBounds<PartitionPosition> range, EndpointsForRange natural)
         {
-            // Range queries do not contact pending replicas
-            super(keyspace, consistencyLevel, natural, null, selected);
+            super(natural);
             this.range = range;
         }
 
         @Override
-        public ForRange withSelected(EndpointsForRange newSelected)
+        public AbstractBounds<PartitionPosition> range()
         {
-            return new ForRange(keyspace, consistencyLevel, range, natural, newSelected);
+            return range;
         }
 
-        @Override
-        public ForRange withConsistencyLevel(ConsistencyLevel cl)
+        public ReplicaLayout.ForRangeRead filter(Predicate<Replica> filter)
         {
-            return new ForRange(keyspace, cl, range, natural, selected);
+            EndpointsForRange filtered = natural().filter(filter);
+            // AbstractReplicaCollection.filter returns itself if all elements match the filter
+            if (filtered == natural()) return this;
+            return new ReplicaLayout.ForRangeRead(range(), filtered);
         }
     }
 
-    public static class ForToken extends ReplicaLayout<EndpointsForToken, ForToken>
+    public static class ForWrite<E extends Endpoints<E>> extends ReplicaLayout<E>
     {
-        public final Token token;
+        final E all;
+        final E pending;
 
-        @VisibleForTesting
-        public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected)
+        ForWrite(E natural, E pending, E all)
         {
-            super(keyspace, consistencyLevel, natural, pending, selected);
-            this.token = token;
+            super(natural);
+            assert pending != null && !haveWriteConflicts(natural, pending);
+            if (all == null)
+                all = Endpoints.concat(natural, pending);
+            this.all = all;
+            this.pending = pending;
         }
 
-        public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+        public final E all()
         {
-            super(keyspace, consistencyLevel, natural, pending, selected, all);
-            this.token = token;
+            return all;
         }
 
-        public ForToken withSelected(EndpointsForToken newSelected)
+        public final E pending()
         {
-            return new ForToken(keyspace, consistencyLevel, token, natural, pending, newSelected);
+            return pending;
         }
 
-        @Override
-        public ForToken withConsistencyLevel(ConsistencyLevel cl)
+        public String toString()
         {
-            return new ForToken(keyspace, cl, token, natural, pending, selected);
+            return "ReplicaLayout [ natural: " + natural() + ", pending: " + pending + " ]";
         }
     }
 
-    public static class ForPaxos extends ForToken
+    public static class ForTokenWrite extends ForWrite<EndpointsForToken> implements ForToken
     {
-        private final int requiredParticipants;
-
-        private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int requiredParticipants, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+        public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
         {
-            super(keyspace, consistencyLevel, token, natural, pending, selected, all);
-            this.requiredParticipants = requiredParticipants;
+            this(natural, pending, null);
         }
-
-        public int getRequiredParticipants()
+        public ForTokenWrite(EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken all)
         {
-            return requiredParticipants;
+            super(natural, pending, all);
         }
-    }
 
-    public static ForToken forSingleReplica(Keyspace keyspace, Token token, Replica replica)
-    {
-        EndpointsForToken singleReplica = EndpointsForToken.of(token, replica);
-        return new ForToken(keyspace, ConsistencyLevel.ONE, token, singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica);
-    }
-
-    public static ForRange forSingleReplica(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
-    {
-        EndpointsForRange singleReplica = EndpointsForRange.of(replica);
-        return new ForRange(keyspace, ConsistencyLevel.ONE, range, singleReplica, singleReplica);
-    }
+        @Override
+        public Token token() { return natural().token(); }
 
-    public static ForToken forCounterWrite(Keyspace keyspace, Token token, Replica replica)
-    {
-        return forSingleReplica(keyspace, token, replica);
+        public ReplicaLayout.ForTokenWrite filter(Predicate<Replica> filter)
+        {
+            EndpointsForToken filtered = all().filter(filter);
+            // AbstractReplicaCollection.filter returns itself if all elements match the filter
+            if (filtered == all()) return this;
+            // unique by endpoint, so can for efficiency filter only on endpoint
+            return new ReplicaLayout.ForTokenWrite(
+                    natural().keep(filtered.endpoints()),
+                    pending().keep(filtered.endpoints()),
+                    filtered
+            );
+        }
     }
 
-    public static ForToken forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+    public interface ForRange
     {
-        // A single case we write not for range or token, but multiple mutations to many tokens
-        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
-        EndpointsForToken natural = EndpointsForToken.copyOf(token, SystemReplicas.getSystemReplicas(endpoints));
-        EndpointsForToken pending = EndpointsForToken.empty(token);
-        ConsistencyLevel consistencyLevel = natural.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
-
-        return forWriteWithDownNodes(keyspace, consistencyLevel, token, natural, pending);
+        public AbstractBounds<PartitionPosition> range();
     }
 
-    public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token) throws UnavailableException
+    public interface ForToken
     {
-        return forWrite(keyspace, consistencyLevel, token, Predicates.alwaysTrue());
+        public Token token();
     }
 
-    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+    /**
+     * Gets the 'natural' and 'pending' replicas that own a given token, with no filtering or processing.
+     *
+     * Since a write is intended for all nodes (except, unless necessary, transient replicas), this method's
+     * only responsibility is to fetch the 'natural' and 'pending' replicas, then resolve any conflicts
+     * {@link ReplicaLayout#haveWriteConflicts(Endpoints, Endpoints)}
+     */
+    public static ReplicaLayout.ForTokenWrite forTokenWriteLiveAndDown(Keyspace keyspace, Token token)
     {
-        EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), token);
+        // TODO: race condition to fetch these. implications??
+        EndpointsForToken natural = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
         EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName());
-        return forWrite(keyspace, consistencyLevel, token, natural, pending, isAlive);
-    }
-
-    public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending) throws UnavailableException
-    {
-        return forWrite(keyspace, consistencyLevel, token, natural, pending, Predicates.alwaysTrue());
+        return forTokenWrite(natural, pending);
     }
 
-    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+    public static ReplicaLayout.ForTokenWrite forTokenWrite(EndpointsForToken natural, EndpointsForToken pending)
     {
-        if (Endpoints.haveConflicts(natural, pending))
+        if (haveWriteConflicts(natural, pending))
         {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
+            natural = resolveWriteConflictsInNatural(natural, pending);
+            pending = resolveWriteConflictsInPending(natural, pending);
         }
-
-        if (!any(natural, Replica::isTransient) && !any(pending, Replica::isTransient))
-        {
-            EndpointsForToken selected = Endpoints.concat(natural, pending).filter(r -> isAlive.test(r.endpoint()));
-            return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected);
-        }
-
-        return forWrite(keyspace, consistencyLevel, token, consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive);
+        return new ReplicaLayout.ForTokenWrite(natural, pending);
     }
 
-    public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+    /**
+     * Detect if we have any endpoint in both pending and full; this can occur either due to races (there is no isolation)
+     * or because an endpoint is transitioning between full and transient replication status.
+     *
+     * We essentially always prefer the full version for writes, because this is stricter.
+     *
+     * For transient->full transitions:
+     *
+     *   Since we always write to any pending transient replica, effectively upgrading it to full for the transition duration,
+     *   it might at first seem to be OK to continue treating the conflict replica as its 'natural' transient form,
+     *   as there is always a quorum of nodes receiving the write.  However, ring ownership changes are not atomic or
+     *   consistent across the cluster, and it is possible for writers to see different ring states.
+     *
+     *   Furthermore, an operator would expect that the full node has received all writes, with no extra need for repair
+     *   (as the normal contract dictates) when it completes its transition.
+     *
+     *   While we cannot completely eliminate risks due to ring inconsistencies, this approach is the most conservative
+     *   available to us today to mitigate, and (we think) the easiest to reason about.
+     *
+     * For full->transient transitions:
+     *
+     *   In this case, things are dicier, because in theory we can trigger this change instantly.  All we need to do is
+     *   drop some data, surely?
+     *
+     *   Ring movements can put us in a pickle; any other node could believe us to be full when we have become transient,
+     *   and perform a full data request to us that we believe ourselves capable of answering, but that we are not.
+     *   If the ring is inconsistent, it's even feasible that a transient request would be made to the node that is losing
+     *   its transient status, that also does not know it has yet done so, resulting in all involved nodes being unaware
+     *   of the data inconsistency.
+     *
+     *   This happens because ring ownership changes are implied by a single node; not all owning nodes get a say in when
+     *   the transition takes effect.  As such, a node can hold an incorrect belief about its own ownership ranges.
+     *
+     *   This race condition is somewhat inherent in present day Cassandra, and there's actually a limit to what we can do about it.
+     *   It is a little more dangerous with transient replication, however, because we can completely answer a request without
+     *   ever touching a digest, meaning we are less likely to attempt to repair any inconsistency.
+     *
+     *   We aren't guaranteed to contact any different nodes for the data requests, of course, though we at least have a chance.
+     *
+     * Note: If we have any pending transient->full movement, we need to move the full replica to our 'natural' bucket
+     * to avoid corrupting our count.  This is fine for writes, all we're doing is ensuring we always write to the node,
+     * instead of selectively.
+     *
+     * @param natural
+     * @param pending
+     * @param <E>
+     * @return
+     */
+    static <E extends Endpoints<E>> boolean haveWriteConflicts(E natural, E pending)
     {
-        Token tk = key.getToken();
-        EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), tk);
-        EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspace.getName());
-        if (Endpoints.haveConflicts(natural, pending))
+        Set<InetAddressAndPort> naturalEndpoints = natural.endpoints();
+        for (InetAddressAndPort pendingEndpoint : pending.endpoints())
         {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
+            if (naturalEndpoints.contains(pendingEndpoint))
+                return true;
         }
-
-        // TODO CASSANDRA-14547
-        Replicas.temporaryAssertFull(natural);
-        Replicas.temporaryAssertFull(pending);
-
-        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
-        {
-            // Restrict natural and pending to node in the local DC only
-            String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
-            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-            Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
-
-            natural = natural.filter(isLocalDc);
-            pending = pending.filter(isLocalDc);
-        }
-
-        int participants = pending.size() + natural.size();
-        int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
-
-        EndpointsForToken all = Endpoints.concat(natural, pending);
-        EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive);
-        if (selected.size() < requiredParticipants)
-            throw UnavailableException.create(consistencyForPaxos, requiredParticipants, selected.size());
-
-        // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
-        // Note that we fake an impossible number of required nodes in the unavailable exception
-        // to nail home the point that it's an impossible operation no matter how many nodes are live.
-        if (pending.size() > 1)
-            throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pending.size()),
-                                           consistencyForPaxos,
-                                           participants + 1,
-                                           selected.size());
-
-        return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, key.getToken(), requiredParticipants, natural, pending, selected, all);
+        return false;
     }
 
     /**
-     * We want to send mutations to as many full replicas as we can, and just as many transient replicas
-     * as we need to meet blockFor.
+     * MUST APPLY FIRST
+     * See {@link ReplicaLayout#haveWriteConflicts}
+     * @return a 'natural' replica collection, that has had its conflicts with pending repaired
      */
-    @VisibleForTesting
-    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int blockFor, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws UnavailableException
+    private static <E extends Endpoints<E>> E resolveWriteConflictsInNatural(E natural, E pending)
     {
-        EndpointsForToken all = Endpoints.concat(natural, pending);
-        EndpointsForToken selected = all
-                .select()
-                .add(r -> r.isFull() && livePredicate.test(r.endpoint()))
-                .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), blockFor)
-                .get();
-
-        consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, pending);
-
-        return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected, all);
+        return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true));
     }
 
-    public static ForToken forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+    /**
+     * MUST APPLY SECOND
+     * See {@link ReplicaLayout#haveWriteConflicts}
+     * @return a 'pending' replica collection, that has had its conflicts with natural repaired
+     */
+    private static <E extends Endpoints<E>> E resolveWriteConflictsInPending(E natural, E pending)
     {
-        EndpointsForToken natural = StorageProxy.getLiveSortedReplicasForToken(keyspace, token);
-        EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
-
-        // Throw UAE early if we don't have enough replicas.
-        consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected);
-
-        return new ForToken(keyspace, consistencyLevel, token, natural, null, selected);
+        return pending.without(natural.endpoints());
     }
 
-    public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+    /**
+     * @return the read layout for a token - this includes only live natural replicas, i.e. those that are not pending
+     * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
+     */
+    static ReplicaLayout.ForTokenRead forTokenReadLiveSorted(Keyspace keyspace, Token token)
     {
-        return new ForRange(keyspace, consistencyLevel, range, natural, selected);
+        EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(token);
+        replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
+        replicas = replicas.filter(FailureDetector.isReplicaAlive);
+        return new ReplicaLayout.ForTokenRead(replicas);
     }
 
-    public String toString()
+    /**
+     * TODO: we should really double check that the provided range does not overlap multiple token ring regions
+     * @return the read layout for a range - this includes only live natural replicas, i.e. those that are not pending
+     * and not marked down by the failure detector. these are reverse sorted by the badness score of the configured snitch
+     */
+    static ReplicaLayout.ForRangeRead forRangeReadLiveSorted(Keyspace keyspace, AbstractBounds<PartitionPosition> range)
     {
-        return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + keyspace + " natural: " + natural + "pending: " + pending + " selected: " + selected + " ]";
+        EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(range.right);
+        replicas = DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas);
+        replicas = replicas.filter(FailureDetector.isReplicaAlive);
+        return new ReplicaLayout.ForRangeRead(range, replicas);
     }
-}
 
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlan.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
new file mode 100644
index 0000000..4d6127b
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.collect.Iterables;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+
+import java.util.function.Predicate;
+
+public abstract class ReplicaPlan<E extends Endpoints<E>>
+{
+    protected final Keyspace keyspace;
+    protected final ConsistencyLevel consistencyLevel;
+
+    // all nodes we will contact via any mechanism, including hints
+    // i.e., for:
+    //  - reads, only live natural replicas
+    //      ==> live.natural().subList(0, blockFor + initial speculate)
+    //  - writes, includes all full, and any pending replicas, (and only any necessary transient ones to make up the difference)
+    //      ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() ++ live.natural.filter(isTransient, req)
+    //  - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL
+    //      ==> live.all()  (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal))
+    private final E contacts;
+
+    ReplicaPlan(Keyspace keyspace, ConsistencyLevel consistencyLevel, E contacts)
+    {
+        assert contacts != null;
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.contacts = contacts;
+    }
+
+    public abstract int blockFor();
+    public abstract void assureSufficientReplicas();
+
+    public E contacts() { return contacts; }
+    public boolean contacts(Replica replica) { return contacts.contains(replica); }
+    public Keyspace keyspace() { return keyspace; }
+    public ConsistencyLevel consistencyLevel() { return consistencyLevel; }
+
+    public static abstract class ForRead<E extends Endpoints<E>> extends ReplicaPlan<E>
+    {
+        // all nodes we *could* contacts; typically all natural replicas that are believed to be alive
+        // we will consult this collection to find uncontacted nodes we might contact if we doubt we will meet consistency level
+        private final E candidates;
+
+        ForRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, E candidates, E contact)
+        {
+            super(keyspace, consistencyLevel, contact);
+            this.candidates = candidates;
+        }
+
+        public int blockFor() { return consistencyLevel.blockFor(keyspace); }
+        public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForRead(keyspace, candidates()); }
+
+        public E candidates() { return candidates; }
+
+        public E uncontactedCandidates()
+        {
+            return candidates().filter(r -> !contacts(r));
+        }
+
+        public Replica firstUncontactedCandidate(Predicate<Replica> extraPredicate)
+        {
+            return Iterables.tryFind(candidates(), r -> extraPredicate.test(r) && !contacts(r)).orNull();
+        }
+
+        public Replica getReplicaFor(InetAddressAndPort endpoint)
+        {
+            return candidates().byEndpoint().get(endpoint);
+        }
+
+        public String toString()
+        {
+            return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + contacts() + " ]";
+        }
+    }
+
+    public static class ForTokenRead extends ForRead<EndpointsForToken>
+    {
+        public ForTokenRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken candidates, EndpointsForToken contact)
+        {
+            super(keyspace, consistencyLevel, candidates, contact);
+        }
+
+        ForTokenRead withContact(EndpointsForToken newContact)
+        {
+            return new ForTokenRead(keyspace, consistencyLevel, candidates(), newContact);
+        }
+    }
+
+    public static class ForRangeRead extends ForRead<EndpointsForRange>
+    {
+        final AbstractBounds<PartitionPosition> range;
+
+        public ForRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact)
+        {
+            super(keyspace, consistencyLevel, candidates, contact);
+            this.range = range;
+        }
+
+        public AbstractBounds<PartitionPosition> range() { return range; }
+
+        ForRangeRead withContact(EndpointsForRange newContact)
+        {
+            return new ForRangeRead(keyspace, consistencyLevel, range, candidates(), newContact);
+        }
+    }
+
+    public static abstract class ForWrite<E extends Endpoints<E>> extends ReplicaPlan<E>
+    {
+        // TODO: this is only needed because of poor isolation of concerns elsewhere - we can remove it soon, and will do so in a follow-up patch
+        final E pending;
+        final E liveAndDown;
+        final E live;
+
+        ForWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, E pending, E liveAndDown, E live, E contact)
+        {
+            super(keyspace, consistencyLevel, contact);
+            this.pending = pending;
+            this.liveAndDown = liveAndDown;
+            this.live = live;
+        }
+
+        public int blockFor() { return consistencyLevel.blockForWrite(keyspace, pending()); }
+        public void assureSufficientReplicas() { consistencyLevel.assureSufficientLiveReplicasForWrite(keyspace, live(), pending()); }
+
+        /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */
+        public E pending() { return pending; }
+        /** Replicas that can participate in the write - this always includes all nodes (pending and natural) in all DCs, except for paxos LOCAL_QUORUM (which is local DC only) */
+        public E liveAndDown() { return liveAndDown; }
+        /** The live replicas present in liveAndDown, usually derived from FailureDetector.isReplicaAlive */
+        public E live() { return live; }
+        /** Calculate which live endpoints we could have contacted, but chose not to */
+        public E liveUncontacted() { return live().filter(r -> !contacts(r)); }
+        /** Test liveness, consistent with the upfront analysis done for this operation (i.e. test membership of live()) */
+        public boolean isAlive(Replica replica) { return live.endpoints().contains(replica.endpoint()); }
+
+        public String toString()
+        {
+            return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " contacts: " + contacts() +  " ]";
+        }
+    }
+
+    public static class ForTokenWrite extends ForWrite<EndpointsForToken>
+    {
+        public ForTokenWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact)
+        {
+            super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
+        }
+
+        private ReplicaPlan.ForTokenWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken newContact)
+        {
+            return new ReplicaPlan.ForTokenWrite(keyspace, newConsistencyLevel, pending(), liveAndDown(), live(), newContact);
+        }
+
+        ForTokenWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); }
+        public ForTokenWrite withContact(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); }
+    }
+
+    public static class ForPaxosWrite extends ForWrite<EndpointsForToken>
+    {
+        final int requiredParticipants;
+
+        ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact, int requiredParticipants)
+        {
+            super(keyspace, consistencyLevel, pending, liveAndDown, live, contact);
+            this.requiredParticipants = requiredParticipants;
+        }
+
+        public int requiredParticipants() { return requiredParticipants; }
+    }
+
+    /**
+     * Used by AbstractReadExecutor, {Data,Digest}Resolver and ReadRepair to share a ReplicaPlan whose 'contacts' replicas
+     * we progressively modify via various forms of speculation (initial speculation, rr-read and rr-write)
+     *
+     * The internal reference is not volatile, despite being shared between threads.  The initial reference provided to
+     * the constructor should be visible by the normal process of sharing data between threads (i.e. executors, etc)
+     * and any updates will either be seen or not seen, perhaps not promptly, but certainly not incompletely.
+     * The contained ReplicaPlan has only final member properties, so it cannot be seen partially initialised.
+     */
+    public interface Shared<E extends Endpoints<E>, P extends ReplicaPlan<E>>
+    {
+        /**
+         * add the provided replica to this shared plan, by updating the internal reference
+         */
+        public void addToContacts(Replica replica);
+        /**
+         * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised
+         */
+        public P get();
+        /**
+         * get the shared replica plan, non-volatile (so maybe stale) but no risk of partially initialised,
+         * but replace its 'contacts' with those provided
+         */
+        public abstract P getWithContacts(E endpoints);
+    }
+
+    public static class SharedForTokenRead implements Shared<EndpointsForToken, ForTokenRead>
+    {
+        private ForTokenRead replicaPlan;
+        SharedForTokenRead(ForTokenRead replicaPlan) { this.replicaPlan = replicaPlan; }
+        public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+        public ForTokenRead get() { return replicaPlan; }
+        public ForTokenRead getWithContacts(EndpointsForToken newContact) { return replicaPlan.withContact(newContact); }
+    }
+
+    public static class SharedForRangeRead implements Shared<EndpointsForRange, ForRangeRead>
+    {
+        private ForRangeRead replicaPlan;
+        SharedForRangeRead(ForRangeRead replicaPlan) { this.replicaPlan = replicaPlan; }
+        public void addToContacts(Replica replica) { replicaPlan = replicaPlan.withContact(Endpoints.append(replicaPlan.contacts(), replica)); }
+        public ForRangeRead get() { return replicaPlan; }
+        public ForRangeRead getWithContacts(EndpointsForRange newContact) { return replicaPlan.withContact(newContact); }
+    }
+
+    public static SharedForTokenRead shared(ForTokenRead replicaPlan) { return new SharedForTokenRead(replicaPlan); }
+    public static SharedForRangeRead shared(ForRangeRead replicaPlan) { return new SharedForRangeRead(replicaPlan); }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org


[2/4] cassandra git commit: ReplicaPlan/Layout refactor follow-up/completion

Posted by be...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index 75885ae..c296cba 100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@ -21,8 +21,9 @@ import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Preconditions;
 
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import com.google.common.base.Predicates;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,6 @@ import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.tracing.Tracing;
 
 import static com.google.common.collect.Iterables.all;
-import static com.google.common.collect.Iterables.tryFind;
 
 /**
  * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel.
@@ -65,24 +65,25 @@ public abstract class AbstractReadExecutor
     private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class);
 
     protected final ReadCommand command;
-    private   final ReplicaLayout.ForToken replicaLayout;
-    protected final ReadRepair<EndpointsForToken, ReplicaLayout.ForToken> readRepair;
-    protected final DigestResolver<EndpointsForToken, ReplicaLayout.ForToken> digestResolver;
-    protected final ReadCallback<EndpointsForToken, ReplicaLayout.ForToken> handler;
+    private   final ReplicaPlan.SharedForTokenRead replicaPlan;
+    protected final ReadRepair<EndpointsForToken, ReplicaPlan.ForTokenRead> readRepair;
+    protected final DigestResolver<EndpointsForToken, ReplicaPlan.ForTokenRead> digestResolver;
+    protected final ReadCallback<EndpointsForToken, ReplicaPlan.ForTokenRead> handler;
     protected final TraceState traceState;
     protected final ColumnFamilyStore cfs;
     protected final long queryStartNanoTime;
     private   final int initialDataRequestCount;
     protected volatile PartitionIterator result = null;
 
-    AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, int initialDataRequestCount, long queryStartNanoTime)
+    AbstractReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, int initialDataRequestCount, long queryStartNanoTime)
     {
         this.command = command;
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = ReplicaPlan.shared(replicaPlan);
         this.initialDataRequestCount = initialDataRequestCount;
-        this.readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
-        this.digestResolver = new DigestResolver<>(command, replicaLayout, queryStartNanoTime);
-        this.handler = new ReadCallback<>(digestResolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), command, replicaLayout, queryStartNanoTime);
+        // the ReadRepair and DigestResolver both need to see our updated
+        this.readRepair = ReadRepair.create(command, this.replicaPlan, queryStartNanoTime);
+        this.digestResolver = new DigestResolver<>(command, this.replicaPlan, queryStartNanoTime);
+        this.handler = new ReadCallback<>(digestResolver, command, this.replicaPlan, queryStartNanoTime);
         this.cfs = cfs;
         this.traceState = Tracing.instance.get();
         this.queryStartNanoTime = queryStartNanoTime;
@@ -93,7 +94,7 @@ public abstract class AbstractReadExecutor
         // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once
         // we stop being compatible with pre-3.0 nodes.
         int digestVersion = MessagingService.current_version;
-        for (Replica replica : replicaLayout.selected())
+        for (Replica replica : replicaPlan.contacts())
             digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica.endpoint()));
         command.setDigestVersion(digestVersion);
     }
@@ -168,7 +169,7 @@ public abstract class AbstractReadExecutor
      */
     public void executeAsync()
     {
-        EndpointsForToken selected = replicaLayout().selected();
+        EndpointsForToken selected = replicaPlan().contacts();
         EndpointsForToken fullDataRequests = selected.filter(Replica::isFull, initialDataRequestCount);
         makeFullDataRequests(fullDataRequests);
         makeTransientDataRequests(selected.filter(Replica::isTransient));
@@ -184,30 +185,25 @@ public abstract class AbstractReadExecutor
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id);
         SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
 
-        // Endpoints for Token
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
+        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), consistencyLevel, retry);
 
         // Speculative retry is disabled *OR*
         // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
         if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
-            // TODO Looks like we might want to move speculation into the replica layout, but that might be a story for post-4.0
-            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, false);
+            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, false);
 
         // There are simply no extra replicas to speculate.
         // Handle this separately so it can record failed attempts to speculate due to lack of replicas
-        if (replicaLayout.selected().size() == replicaLayout.all().size())
+        if (replicaPlan.contacts().size() == replicaPlan.candidates().size())
         {
             boolean recordFailedSpeculation = consistencyLevel != ConsistencyLevel.ALL;
-            return new NeverSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime, recordFailedSpeculation);
+            return new NeverSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime, recordFailedSpeculation);
         }
 
-        // If CL.ALL, upgrade to AlwaysSpeculating;
-        // If We are going to contact every node anyway, ask for 2 full data requests instead of 1, for redundancy
-        // (same amount of requests in total, but we turn 1 digest request into a full blown data request)
         if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
-            return new AlwaysSpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
+            return new AlwaysSpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(cfs, command, replicaLayout, queryStartNanoTime);
+            return new SpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
     }
 
     /**
@@ -223,9 +219,9 @@ public abstract class AbstractReadExecutor
         return !handler.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS);
     }
 
-    ReplicaLayout.ForToken replicaLayout()
+    ReplicaPlan.ForTokenRead replicaPlan()
     {
-        return replicaLayout;
+        return replicaPlan.get();
     }
 
     void onReadTimeout() {}
@@ -239,9 +235,9 @@ public abstract class AbstractReadExecutor
          */
         private final boolean logFailedSpeculation;
 
-        public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaLayout.ForToken replicaLayout, long queryStartNanoTime, boolean logFailedSpeculation)
+        public NeverSpeculatingReadExecutor(ColumnFamilyStore cfs, ReadCommand command, ReplicaPlan.ForTokenRead replicaPlan, long queryStartNanoTime, boolean logFailedSpeculation)
         {
-            super(cfs, command, replicaLayout, 1, queryStartNanoTime);
+            super(cfs, command, replicaPlan, 1, queryStartNanoTime);
             this.logFailedSpeculation = logFailedSpeculation;
         }
 
@@ -260,13 +256,13 @@ public abstract class AbstractReadExecutor
 
         public SpeculatingReadExecutor(ColumnFamilyStore cfs,
                                        ReadCommand command,
-                                       ReplicaLayout.ForToken replicaLayout,
+                                       ReplicaPlan.ForTokenRead replicaPlan,
                                        long queryStartNanoTime)
         {
             // We're hitting additional targets for read repair (??).  Since our "extra" replica is the least-
             // preferred by the snitch, we do an extra data read to start with against a replica more
             // likely to reply; better to let RR fail than the entire query.
-            super(cfs, command, replicaLayout, replicaLayout.blockFor() < replicaLayout.selected().size() ? 2 : 1, queryStartNanoTime);
+            super(cfs, command, replicaPlan, replicaPlan.blockFor() < replicaPlan.contacts().size() ? 2 : 1, queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -277,12 +273,12 @@ public abstract class AbstractReadExecutor
                 cfs.metric.speculativeRetries.inc();
                 speculated = true;
 
+                ReplicaPlan.ForTokenRead replicaPlan = replicaPlan();
                 ReadCommand retryCommand = command;
                 Replica extraReplica;
                 if (handler.resolver.isDataPresent())
                 {
-                    extraReplica = tryFind(replicaLayout().all(),
-                            r -> !replicaLayout().selected().contains(r)).orNull();
+                    extraReplica = replicaPlan.firstUncontactedCandidate(Predicates.alwaysTrue());
 
                     // we should only use a SpeculatingReadExecutor if we have an extra replica to speculate against
                     assert extraReplica != null;
@@ -293,8 +289,7 @@ public abstract class AbstractReadExecutor
                 }
                 else
                 {
-                    extraReplica = tryFind(replicaLayout().all(),
-                            r -> r.isFull() && !replicaLayout().selected().contains(r)).orNull();
+                    extraReplica = replicaPlan.firstUncontactedCandidate(Replica::isFull);
                     if (extraReplica == null)
                     {
                         cfs.metric.speculativeInsufficientReplicas.inc();
@@ -304,6 +299,11 @@ public abstract class AbstractReadExecutor
                     }
                 }
 
+                // we must update the plan to include this new node, else when we come to read-repair, we may not include this
+                // speculated response in the data requests we make again, and we will not be able to 'speculate' an extra repair read,
+                // nor would we be able to speculate a new 'write' if the repair writes are insufficient
+                super.replicaPlan.addToContacts(extraReplica);
+
                 if (traceState != null)
                     traceState.trace("speculating read retry on {}", extraReplica);
                 logger.trace("speculating read retry on {}", extraReplica);
@@ -325,10 +325,12 @@ public abstract class AbstractReadExecutor
     {
         public AlwaysSpeculatingReadExecutor(ColumnFamilyStore cfs,
                                              ReadCommand command,
-                                             ReplicaLayout.ForToken replicaLayout,
+                                             ReplicaPlan.ForTokenRead replicaPlan,
                                              long queryStartNanoTime)
         {
-            super(cfs, command, replicaLayout, replicaLayout.selected().size() > 1 ? 2 : 1, queryStartNanoTime);
+            // presumably, we speculate an extra data request here in case it is our data request that fails to respond,
+            // and there are no more nodes to consult
+            super(cfs, command, replicaPlan, replicaPlan.contacts().size() > 1 ? 2 : 1, queryStartNanoTime);
         }
 
         public void maybeTryAdditionalReplicas()
@@ -403,7 +405,7 @@ public abstract class AbstractReadExecutor
                 logger.trace("Timed out waiting on digest mismatch repair requests");
             // the caught exception here will have CL.ALL from the repair command,
             // not whatever CL the initial command was at (CASSANDRA-7947)
-            throw new ReadTimeoutException(replicaLayout().consistencyLevel(), handler.blockfor - 1, handler.blockfor, true);
+            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), handler.blockFor - 1, handler.blockFor, true);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DataResolver.java b/src/java/org/apache/cassandra/service/reads/DataResolver.java
index a6901b2..db5f3c8 100644
--- a/src/java/org/apache/cassandra/service/reads/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DataResolver.java
@@ -42,7 +42,7 @@ import org.apache.cassandra.db.transform.Filter;
 import org.apache.cassandra.db.transform.FilteredPartitions;
 import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -52,14 +52,14 @@ import org.apache.cassandra.service.reads.repair.RepairedDataVerifier;
 import static com.google.common.collect.Iterables.*;
 import static org.apache.cassandra.db.partitions.UnfilteredPartitionIterators.MergeListener;
 
-public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
+public class DataResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 {
     private final boolean enforceStrictLiveness;
-    private final ReadRepair<E, L> readRepair;
+    private final ReadRepair<E, P> readRepair;
 
-    public DataResolver(ReadCommand command, L replicaLayout, ReadRepair<E, L> readRepair, long queryStartNanoTime)
+    public DataResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, ReadRepair<E, P> readRepair, long queryStartNanoTime)
     {
-        super(command, replicaLayout, queryStartNanoTime);
+        super(command, replicaPlan, queryStartNanoTime);
         this.enforceStrictLiveness = command.metadata().enforceStrictLiveness();
         this.readRepair = readRepair;
     }
@@ -83,7 +83,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
         Collection<MessageIn<ReadResponse>> messages = responses.snapshot();
         assert !any(messages, msg -> msg.payload.isDigestResponse());
 
-        E replicas = replicaLayout.all().keep(transform(messages, msg -> msg.from));
+        E replicas = replicaPlan().candidates().select(transform(messages, msg -> msg.from), false);
         List<UnfilteredPartitionIterator> iters = new ArrayList<>(
         Collections2.transform(messages, msg -> msg.payload.makeIterator(command)));
         assert replicas.size() == iters.size();
@@ -121,7 +121,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
             command.limits().newCounter(command.nowInSec(), true, command.selectsFullPartition(), enforceStrictLiveness);
 
         UnfilteredPartitionIterator merged = mergeWithShortReadProtection(iters,
-                                                                          replicaLayout.withSelected(replicas),
+                                                                          replicaPlan.getWithContacts(replicas),
                                                                           mergedResultCounter,
                                                                           repairedDataTracker);
         FilteredPartitions filtered = FilteredPartitions.filter(merged, new Filter(command.nowInSec(), command.metadata().enforceStrictLiveness()));
@@ -135,7 +135,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
     }
 
     private UnfilteredPartitionIterator mergeWithShortReadProtection(List<UnfilteredPartitionIterator> results,
-                                                                     L sources,
+                                                                     P sources,
                                                                      DataLimits.Counter mergedResultCounter,
                                                                      RepairedDataTracker repairedDataTracker)
     {
@@ -150,7 +150,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
          */
         if (!command.limits().isUnlimited())
             for (int i = 0; i < results.size(); i++)
-                results.set(i, ShortReadProtection.extend(sources.selected().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
+                results.set(i, ShortReadProtection.extend(sources.contacts().get(i), results.get(i), command, mergedResultCounter, queryStartNanoTime, enforceStrictLiveness));
 
         return UnfilteredPartitionIterators.merge(results, wrapMergeListener(readRepair.getMergeListener(sources), sources, repairedDataTracker));
     }
@@ -161,7 +161,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
     }
 
     private UnfilteredPartitionIterators.MergeListener wrapMergeListener(UnfilteredPartitionIterators.MergeListener partitionListener,
-                                                                         L sources,
+                                                                         P sources,
                                                                          RepairedDataTracker repairedDataTracker)
     {
         // Avoid wrapping no-op listeners as it doesn't throw
@@ -191,7 +191,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
                                                            table,
                                                            mergedDeletion == null ? "null" : mergedDeletion.toString(),
                                                            '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString())) + ']',
-                                                           sources.selected(),
+                                                           sources.contacts(),
                                                            makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }
@@ -212,7 +212,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
                                                            table,
                                                            merged == null ? "null" : merged.toString(table),
                                                            '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
-                                                           sources.selected(),
+                                                           sources.contacts(),
                                                            makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }
@@ -238,7 +238,7 @@ public class DataResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
                                                            table,
                                                            merged == null ? "null" : merged.toString(table),
                                                            '[' + Joiner.on(", ").join(transform(Arrays.asList(versions), rt -> rt == null ? "null" : rt.toString(table))) + ']',
-                                                           sources.selected(),
+                                                           sources.contacts(),
                                                            makeResponsesDebugString(partitionKey));
                             throw new AssertionError(details, e);
                         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/DigestResolver.java b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
index 28c2117..0dcae95 100644
--- a/src/java/org/apache/cassandra/service/reads/DigestResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/DigestResolver.java
@@ -29,10 +29,10 @@ import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.db.SinglePartitionReadCommand;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
 import org.apache.cassandra.service.reads.repair.ReadRepair;
@@ -40,13 +40,13 @@ import org.apache.cassandra.utils.ByteBufferUtil;
 
 import static com.google.common.collect.Iterables.any;
 
-public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ResponseResolver<E, L>
+public class DigestResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> extends ResponseResolver<E, P>
 {
     private volatile MessageIn<ReadResponse> dataResponse;
 
-    public DigestResolver(ReadCommand command, L replicas, long queryStartNanoTime)
+    public DigestResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
-        super(command, replicas, queryStartNanoTime);
+        super(command, replicaPlan, queryStartNanoTime);
         Preconditions.checkArgument(command instanceof SinglePartitionReadCommand,
                                     "DigestResolver can only be used with SinglePartitionReadCommand commands");
     }
@@ -55,7 +55,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
     public void preprocess(MessageIn<ReadResponse> message)
     {
         super.preprocess(message);
-        Replica replica = replicaLayout.getReplicaFor(message.from);
+        Replica replica = replicaPlan().getReplicaFor(message.from);
         if (dataResponse == null && !message.payload.isDigestResponse() && replica.isFull())
         {
             dataResponse = message;
@@ -76,7 +76,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
     {
         return any(responses,
                 msg -> !msg.payload.isDigestResponse()
-                        && replicaLayout.getReplicaFor(msg.from).isTransient());
+                        && replicaPlan().getReplicaFor(msg.from).isTransient());
     }
 
     public PartitionIterator getData()
@@ -93,16 +93,14 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
         {
             // This path can be triggered only if we've got responses from full replicas and they match, but
             // transient replica response still contains data, which needs to be reconciled.
-            DataResolver<E, L> dataResolver = new DataResolver<>(command,
-                                                                 replicaLayout,
-                                                                 NoopReadRepair.instance,
-                                                                 queryStartNanoTime);
+            DataResolver<E, P> dataResolver
+                    = new DataResolver<>(command, replicaPlan, NoopReadRepair.instance, queryStartNanoTime);
 
             dataResolver.preprocess(dataResponse);
             // Reconcile with transient replicas
             for (MessageIn<ReadResponse> response : responses)
             {
-                Replica replica = replicaLayout.getReplicaFor(response.from);
+                Replica replica = replicaPlan().getReplicaFor(response.from);
                 if (replica.isTransient())
                     dataResolver.preprocess(response);
             }
@@ -119,7 +117,7 @@ public class DigestResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L
         ByteBuffer digest = null;
         for (MessageIn<ReadResponse> message : responses.snapshot())
         {
-            if (replicaLayout.getReplicaFor(message.from).isTransient())
+            if (replicaPlan().getReplicaFor(message.from).isTransient())
                 continue;
 
             ByteBuffer newDigest = message.payload.digest(command);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
index 3d39377..7a2385c 100644
--- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java
@@ -23,19 +23,18 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.PartitionRangeReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.exceptions.ReadFailureException;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
@@ -44,16 +43,17 @@ import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.SimpleCondition;
 
-public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements IAsyncCallbackWithFailure<ReadResponse>
+public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>> implements IAsyncCallbackWithFailure<ReadResponse>
 {
     protected static final Logger logger = LoggerFactory.getLogger( ReadCallback.class );
 
     public final ResponseResolver resolver;
     final SimpleCondition condition = new SimpleCondition();
     private final long queryStartNanoTime;
-    // TODO: move to replica layout as well?
-    final int blockfor;
-    final L replicaLayout;
+    final int blockFor; // TODO: move to replica plan as well?
+    // this uses a plain reference, but is initialised before handoff to any other threads; the later updates
+    // may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object
+    final ReplicaPlan.Shared<E, P> replicaPlan;
     private final ReadCommand command;
     private static final AtomicIntegerFieldUpdater<ReadCallback> recievedUpdater
             = AtomicIntegerFieldUpdater.newUpdater(ReadCallback.class, "received");
@@ -63,19 +63,24 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
     private volatile int failures = 0;
     private final Map<InetAddressAndPort, RequestFailureReason> failureReasonByEndpoint;
 
-    public ReadCallback(ResponseResolver resolver, int blockfor, ReadCommand command, L replicaLayout, long queryStartNanoTime)
+    public ReadCallback(ResponseResolver resolver, ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         this.command = command;
-        this.blockfor = blockfor;
         this.resolver = resolver;
         this.queryStartNanoTime = queryStartNanoTime;
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
+        this.blockFor = replicaPlan.get().blockFor();
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
         // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
-        assert !(command instanceof PartitionRangeReadCommand) || blockfor >= replicaLayout.selected().size();
+        assert !(command instanceof PartitionRangeReadCommand) || blockFor >= replicaPlan().contacts().size();
 
         if (logger.isTraceEnabled())
-            logger.trace("Blockfor is {}; setting up requests to {}", blockfor, this.replicaLayout);
+            logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan);
+    }
+
+    protected P replicaPlan()
+    {
+        return replicaPlan.get();
     }
 
     public boolean await(long timePastStart, TimeUnit unit)
@@ -94,30 +99,30 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
     public void awaitResults() throws ReadFailureException, ReadTimeoutException
     {
         boolean signaled = await(command.getTimeout(), TimeUnit.MILLISECONDS);
-        boolean failed = failures > 0 && blockfor + failures > replicaLayout.selected().size();
+        boolean failed = failures > 0 && blockFor + failures > replicaPlan().contacts().size();
         if (signaled && !failed)
             return;
 
         if (Tracing.isTracing())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
-            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+            Tracing.trace("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
         }
         else if (logger.isDebugEnabled())
         {
             String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : "";
-            logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockfor, gotData });
+            logger.debug("{}; received {} of {} responses{}", new Object[]{ (failed ? "Failed" : "Timed out"), received, blockFor, gotData });
         }
 
         // Same as for writes, see AbstractWriteResponseHandler
         throw failed
-            ? new ReadFailureException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent(), failureReasonByEndpoint)
-            : new ReadTimeoutException(replicaLayout.consistencyLevel(), received, blockfor, resolver.isDataPresent());
+            ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint)
+            : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent());
     }
 
     public int blockFor()
     {
-        return blockfor;
+        return blockFor;
     }
 
     public void response(MessageIn<ReadResponse> message)
@@ -127,24 +132,16 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
               ? recievedUpdater.incrementAndGet(this)
               : received;
 
-        if (n >= blockfor && resolver.isDataPresent())
+        if (n >= blockFor && resolver.isDataPresent())
             condition.signalAll();
     }
 
     /**
-     * @return true if the message counts towards the blockfor threshold
+     * @return true if the message counts towards the blockFor threshold
      */
     private boolean waitingFor(InetAddressAndPort from)
     {
-        return !replicaLayout.consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
-    }
-
-    /**
-     * @return the current number of received responses
-     */
-    public int getReceivedCount()
-    {
-        return received;
+        return !replicaPlan().consistencyLevel().isDatacenterLocal() || DatabaseDescriptor.getLocalDataCenter().equals(DatabaseDescriptor.getEndpointSnitch().getDatacenter(from));
     }
 
     public void response(ReadResponse result)
@@ -157,11 +154,6 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
         response(message);
     }
 
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        replicaLayout.consistencyLevel().assureSufficientLiveNodesForRead(replicaLayout.keyspace(), replicaLayout.selected());
-    }
-
     public boolean isLatencyForSnitch()
     {
         return true;
@@ -176,7 +168,7 @@ public class ReadCallback<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
 
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (blockfor + n > replicaLayout.selected().size())
+        if (blockFor + n > replicaPlan().contacts().size())
             condition.signalAll();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
index 298f843..0c1e1ba 100644
--- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
+++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java
@@ -23,30 +23,34 @@ import org.slf4j.LoggerFactory;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.utils.concurrent.Accumulator;
 
-public abstract class ResponseResolver<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public abstract class ResponseResolver<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
 {
     protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class);
 
     protected final ReadCommand command;
-    protected final L replicaLayout;
+    protected final ReplicaPlan.Shared<E, P> replicaPlan;
 
     // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints
     protected final Accumulator<MessageIn<ReadResponse>> responses;
     protected final long queryStartNanoTime;
 
-    public ResponseResolver(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+    public ResponseResolver(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         this.command = command;
-        this.replicaLayout = replicaLayout;
-        // TODO: calculate max possible replicas for the query (e.g. local dc queries won't contact remotes)
-        this.responses = new Accumulator<>(replicaLayout.all().size());
+        this.replicaPlan = replicaPlan;
+        this.responses = new Accumulator<>(replicaPlan.get().candidates().size());
         this.queryStartNanoTime = queryStartNanoTime;
     }
 
+    protected P replicaPlan()
+    {
+        return replicaPlan.get();
+    }
+
     public abstract boolean isDataPresent();
 
     public void preprocess(MessageIn<ReadResponse> message)
@@ -57,8 +61,8 @@ public abstract class ResponseResolver<E extends Endpoints<E>, L extends Replica
         }
         catch (IllegalStateException e)
         {
-            logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica layout: {}",
-                         message, command, replicaLayout);
+            logger.error("Encountered error while trying to preprocess the message {}, in command {}, replica plan: {}",
+                         message, command, replicaPlan);
             throw e;
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
index 580b790..b16d105 100644
--- a/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
+++ b/src/java/org/apache/cassandra/service/reads/ShortReadPartitionsProtection.java
@@ -19,6 +19,8 @@
 package org.apache.cassandra.service.reads;
 
 import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
+import org.apache.cassandra.locator.ReplicaPlans;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +42,6 @@ import org.apache.cassandra.db.transform.Transformation;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.dht.ExcludingBounds;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.reads.repair.NoopReadRepair;
@@ -87,10 +88,11 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
          * If we don't apply the transformation *after* extending the partition with MoreRows,
          * applyToRow() method of protection will not be called on the first row of the new extension iterator.
          */
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
+        ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), partition.partitionKey().getToken(), source);
+        ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
         ShortReadRowsProtection protection = new ShortReadRowsProtection(partition.partitionKey(),
                                                                          command, source,
-                                                                         (cmd) -> executeReadCommand(cmd, replicaLayout),
+                                                                         (cmd) -> executeReadCommand(cmd, sharedReplicaPlan),
                                                                          singleResultCounter,
                                                                          mergedResultCounter);
         return Transformation.apply(MoreRows.extend(partition, protection), protection);
@@ -169,14 +171,15 @@ public class ShortReadPartitionsProtection extends Transformation<UnfilteredRowI
                                                       : new ExcludingBounds<>(lastPartitionKey, bounds.right);
         DataRange newDataRange = cmd.dataRange().forSubRange(newBounds);
 
-        ReplicaLayout.ForRange replicaLayout = ReplicaLayout.forSingleReplica(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
-        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), replicaLayout);
+        ReplicaPlan.ForRangeRead replicaPlan = ReplicaPlans.forSingleReplicaRead(Keyspace.open(command.metadata().keyspace), cmd.dataRange().keyRange(), source);
+        return executeReadCommand(cmd.withUpdatedLimitsAndDataRange(newLimits, newDataRange), ReplicaPlan.shared(replicaPlan));
     }
 
-    private <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, L replicaLayout)
+    private <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+    UnfilteredPartitionIterator executeReadCommand(ReadCommand cmd, ReplicaPlan.Shared<E, P> replicaPlan)
     {
-        DataResolver<E, L> resolver = new DataResolver<>(cmd, replicaLayout, (NoopReadRepair<E, L>)NoopReadRepair.instance, queryStartNanoTime);
-        ReadCallback<E, L> handler = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(replicaLayout.keyspace()), cmd, replicaLayout, queryStartNanoTime);
+        DataResolver<E, P> resolver = new DataResolver<>(cmd, replicaPlan, (NoopReadRepair<E, P>)NoopReadRepair.instance, queryStartNanoTime);
+        ReadCallback<E, P> handler = new ReadCallback<>(resolver, cmd, replicaPlan, queryStartNanoTime);
 
         if (source.isLocal())
             StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(cmd, handler));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
index 528d31b..1b213ff 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java
@@ -24,6 +24,7 @@ import java.util.function.Consumer;
 import com.google.common.base.Preconditions;
 
 import com.codahale.metrics.Meter;
+import com.google.common.base.Predicates;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -34,7 +35,7 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.MessageOut;
 import org.apache.cassandra.net.MessagingService;
@@ -44,11 +45,12 @@ import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.ReadCallback;
 import org.apache.cassandra.tracing.Tracing;
 
-public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public abstract class AbstractReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        implements ReadRepair<E, P>
 {
     protected final ReadCommand command;
     protected final long queryStartNanoTime;
-    protected final L replicaLayout;
+    protected final ReplicaPlan.Shared<E, P> replicaPlan;
     protected final ColumnFamilyStore cfs;
 
     private volatile DigestRepair digestRepair = null;
@@ -68,15 +70,20 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
     }
 
     public AbstractReadRepair(ReadCommand command,
-                              L replicaLayout,
+                              ReplicaPlan.Shared<E, P> replicaPlan,
                               long queryStartNanoTime)
     {
         this.command = command;
         this.queryStartNanoTime = queryStartNanoTime;
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
         this.cfs = Keyspace.openAndGetStore(command.metadata());
     }
 
+    protected P replicaPlan()
+    {
+        return replicaPlan.get();
+    }
+
     void sendReadCommand(Replica to, ReadCallback readCallback)
     {
         MessageOut<ReadCommand> message = command.createMessage();
@@ -90,14 +97,13 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
     abstract Meter getRepairMeter();
 
     // digestResolver isn't used here because we resend read requests to all participants
-    public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
     {
         getRepairMeter().mark();
 
         // Do a full data read to resolve the correct response (and repair node that need be)
-        DataResolver<E, L> resolver = new DataResolver<>(command, replicaLayout, this, queryStartNanoTime);
-        ReadCallback<E, L> readCallback = new ReadCallback<>(resolver, replicaLayout.consistencyLevel().blockFor(cfs.keyspace),
-                                                             command, replicaLayout, queryStartNanoTime);
+        DataResolver<E, P> resolver = new DataResolver<>(command, replicaPlan, this, queryStartNanoTime);
+        ReadCallback<E, P> readCallback = new ReadCallback<>(resolver, command, replicaPlan, queryStartNanoTime);
 
         digestRepair = new DigestRepair(resolver, readCallback, resultConsumer);
 
@@ -105,12 +111,12 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
         if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled())
             command.trackRepairedStatus();
 
-        for (Replica replica : replicaLayout.selected())
+        for (Replica replica : replicaPlan().contacts())
         {
             Tracing.trace("Enqueuing full data read to {}", replica);
             sendReadCommand(replica, readCallback);
         }
-        ReadRepairDiagnostics.startRepair(this, replicaLayout, digestResolver);
+        ReadRepairDiagnostics.startRepair(this, replicaPlan(), digestResolver);
     }
 
     public void awaitReads() throws ReadTimeoutException
@@ -125,7 +131,7 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
 
     private boolean shouldSpeculate()
     {
-        ConsistencyLevel consistency = replicaLayout.consistencyLevel();
+        ConsistencyLevel consistency = replicaPlan().consistencyLevel();
         ConsistencyLevel speculativeCL = consistency.isDatacenterLocal() ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
         return  consistency != ConsistencyLevel.EACH_QUORUM
                 && consistency.satisfies(speculativeCL, cfs.keyspace)
@@ -142,15 +148,15 @@ public abstract class AbstractReadRepair<E extends Endpoints<E>, L extends Repli
 
         if (shouldSpeculate() && !repair.readCallback.await(cfs.sampleReadLatencyNanos, TimeUnit.NANOSECONDS))
         {
-            L uncontacted = replicaLayout.forNaturalUncontacted();
-            if (uncontacted.selected().isEmpty())
+            Replica uncontacted = replicaPlan().firstUncontactedCandidate(Predicates.alwaysTrue());
+            if (uncontacted == null)
                 return;
 
-            Replica replica = uncontacted.selected().iterator().next();
-            Tracing.trace("Enqueuing speculative full data read to {}", replica);
-            sendReadCommand(replica, repair.readCallback);
+            replicaPlan.addToContacts(uncontacted);
+            Tracing.trace("Enqueuing speculative full data read to {}", uncontacted);
+            sendReadCommand(uncontacted, repair.readCallback);
             ReadRepairMetrics.speculatedRead.mark();
-            ReadRepairDiagnostics.speculatedRead(this, replica.endpoint(), uncontacted);
+            ReadRepairDiagnostics.speculatedRead(this, uncontacted.endpoint(), replicaPlan());
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
index 54af2cf..f536ea8 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java
@@ -37,9 +37,9 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.locator.Endpoints;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.locator.Replicas;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.net.IAsyncCallback;
@@ -49,25 +49,26 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.tracing.Tracing;
 
-public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractFuture<Object> implements IAsyncCallback<Object>
+public class BlockingPartitionRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        extends AbstractFuture<Object> implements IAsyncCallback<Object>
 {
     private final DecoratedKey key;
-    private final L replicaLayout;
+    private final P replicaPlan;
     private final Map<Replica, Mutation> pendingRepairs;
     private final CountDownLatch latch;
 
     private volatile long mutationsSentTime;
 
-    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout)
+    public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, P replicaPlan)
     {
         this.key = key;
         this.pendingRepairs = new ConcurrentHashMap<>(repairs);
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
 
         // here we remove empty repair mutations from the block for total, since
         // we're not sending them mutations
         int blockFor = maxBlockFor;
-        for (Replica participant: replicaLayout.selected())
+        for (Replica participant: replicaPlan.contacts())
         {
             // remote dcs can sometimes get involved in dc-local reads. We want to repair
             // them if they do, but they shouldn't interfere with blocking the client read.
@@ -97,7 +98,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
 
     private boolean shouldBlockOn(InetAddressAndPort endpoint)
     {
-        return !replicaLayout.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
+        return !replicaPlan.consistencyLevel().isDatacenterLocal() || isLocal(endpoint);
     }
 
     @VisibleForTesting
@@ -105,7 +106,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
     {
         if (shouldBlockOn(from))
         {
-            pendingRepairs.remove(replicaLayout.getReplicaFor(from));
+            pendingRepairs.remove(replicaPlan.getReplicaFor(from));
             latch.countDown();
         }
     }
@@ -198,8 +199,8 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
         if (awaitRepairs(timeout, timeoutUnit))
             return;
 
-        L newCandidates = replicaLayout.forNaturalUncontacted();
-        if (newCandidates.selected().isEmpty())
+        E newCandidates = replicaPlan.uncontactedCandidates();
+        if (newCandidates.isEmpty())
             return;
 
         PartitionUpdate update = mergeUnackedUpdates();
@@ -212,7 +213,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
 
         Mutation[] versionedMutations = new Mutation[msgVersionIdx(MessagingService.current_version) + 1];
 
-        for (Replica replica : newCandidates.selected())
+        for (Replica replica : newCandidates)
         {
             int versionIdx = msgVersionIdx(MessagingService.instance().getVersion(replica.endpoint()));
 
@@ -220,7 +221,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
 
             if (mutation == null)
             {
-                mutation = BlockingReadRepairs.createRepairMutation(update, replicaLayout.consistencyLevel(), replica.endpoint(), true);
+                mutation = BlockingReadRepairs.createRepairMutation(update, replicaPlan.consistencyLevel(), replica.endpoint(), true);
                 versionedMutations[versionIdx] = mutation;
             }
 
@@ -239,7 +240,7 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
 
     Keyspace getKeyspace()
     {
-        return replicaLayout.keyspace();
+        return replicaPlan.keyspace();
     }
 
     DecoratedKey getKey()
@@ -249,6 +250,6 @@ public class BlockingPartitionRepair<E extends Endpoints<E>, L extends ReplicaLa
 
     ConsistencyLevel getConsistency()
     {
-        return replicaLayout.consistencyLevel();
+        return replicaPlan.consistencyLevel();
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
index 402aed0..938abaf 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java
@@ -24,7 +24,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.locator.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,8 +33,9 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
+import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 import org.apache.cassandra.tracing.Tracing;
 
@@ -44,22 +44,23 @@ import org.apache.cassandra.tracing.Tracing;
  *  updates have been written to nodes needing correction. Breaks write
  *  atomicity in some situations
  */
-public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        extends AbstractReadRepair<E, P>
 {
     private static final Logger logger = LoggerFactory.getLogger(BlockingReadRepair.class);
 
     protected final Queue<BlockingPartitionRepair> repairs = new ConcurrentLinkedQueue<>();
     private final int blockFor;
 
-    BlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+    BlockingReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
-        super(command, replicaLayout, queryStartNanoTime);
-        this.blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+        super(command, replicaPlan, queryStartNanoTime);
+        this.blockFor = replicaPlan().consistencyLevel().blockFor(cfs.keyspace);
     }
 
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
     {
-        return new PartitionIteratorMergeListener(replicaLayout, command, this.replicaLayout.consistencyLevel(), this);
+        return new PartitionIteratorMergeListener<>(replicaPlan, command, this);
     }
 
     @Override
@@ -91,20 +92,20 @@ public class BlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
         if (timedOut)
         {
             // We got all responses, but timed out while repairing
-            int blockFor = replicaLayout.consistencyLevel().blockFor(cfs.keyspace);
+            int blockFor = replicaPlan().blockFor();
             if (Tracing.isTracing())
                 Tracing.trace("Timed out while read-repairing after receiving all {} data and digest responses", blockFor);
             else
                 logger.debug("Timeout while read-repairing after receiving all {} data and digest responses", blockFor);
 
-            throw new ReadTimeoutException(replicaLayout.consistencyLevel(), blockFor - 1, blockFor, true);
+            throw new ReadTimeoutException(replicaPlan().consistencyLevel(), blockFor - 1, blockFor, true);
         }
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
     {
-        BlockingPartitionRepair<E, L> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaLayout);
+        BlockingPartitionRepair<E, P> blockingRepair = new BlockingPartitionRepair<>(partitionKey, mutations, blockFor, replicaPlan);
         blockingRepair.sendInitialRepairs();
         repairs.add(blockingRepair);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
index 4af4a92..6aa6ece 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java
@@ -26,28 +26,29 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 
 /**
  * Bypasses the read repair path for short read protection and testing
  */
-public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L>
+public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        implements ReadRepair<E, P>
 {
     public static final NoopReadRepair instance = new NoopReadRepair();
 
     private NoopReadRepair() {}
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicas)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicas)
     {
         return UnfilteredPartitionIterators.MergeListener.NOOP;
     }
 
     @Override
-    public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer)
+    public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer)
     {
         resultConsumer.accept(digestResolver.getData());
     }
@@ -75,7 +76,7 @@ public class NoopReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
     {
 
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
index 4cae3ae..7247704 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/PartitionIteratorMergeListener.java
@@ -28,26 +28,26 @@ import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.db.rows.UnfilteredRowIterator;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.Endpoints;
+import org.apache.cassandra.locator.ReplicaPlan;
 
-public class PartitionIteratorMergeListener implements UnfilteredPartitionIterators.MergeListener
+public class PartitionIteratorMergeListener<E extends Endpoints<E>>
+        implements UnfilteredPartitionIterators.MergeListener
 {
-    private final ReplicaLayout replicaLayout;
+    private final ReplicaPlan.ForRead<E> replicaPlan;
     private final ReadCommand command;
-    private final ConsistencyLevel consistency;
     private final ReadRepair readRepair;
 
-    public PartitionIteratorMergeListener(ReplicaLayout replicaLayout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+    public PartitionIteratorMergeListener(ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
     {
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
         this.command = command;
-        this.consistency = consistency;
         this.readRepair = readRepair;
     }
 
     public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions)
     {
-        return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), replicaLayout, command, consistency, readRepair);
+        return new RowIteratorMergeListener<>(partitionKey, columns(versions), isReversed(versions), replicaPlan, command, readRepair);
     }
 
     protected RegularAndStaticColumns columns(List<UnfilteredRowIterator> versions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
index c13e2d6..64bfec2 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java
@@ -27,22 +27,23 @@ import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.metrics.ReadRepairMetrics;
 
 /**
  * Only performs the collection of data responses and reconciliation of them, doesn't send repair mutations
  * to replicas. This preserves write atomicity, but doesn't provide monotonic quorum reads
  */
-public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends AbstractReadRepair<E, L>
+public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        extends AbstractReadRepair<E, P>
 {
-    ReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+    ReadOnlyReadRepair(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
-        super(command, replicaLayout, queryStartNanoTime);
+        super(command, replicaPlan, queryStartNanoTime);
     }
 
     @Override
-    public UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout)
+    public UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan)
     {
         return UnfilteredPartitionIterators.MergeListener.NOOP;
     }
@@ -60,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<
     }
 
     @Override
-    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout)
+    public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan)
     {
         throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
index 168f003..9441945 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service.reads.repair;
 
 import java.util.Map;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.locator.Endpoints;
@@ -29,17 +30,19 @@ import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
 import org.apache.cassandra.exceptions.ReadTimeoutException;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 
-public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
 {
     public interface Factory
     {
-        <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime);
+        <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime);
     }
 
-    static <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaPlan, long queryStartNanoTime)
+    static <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+    ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
     {
         return command.metadata().params.readRepair.create(command, replicaPlan, queryStartNanoTime);
     }
@@ -47,7 +50,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
     /**
      * Used by DataResolver to generate corrections as the partition iterator is consumed
      */
-    UnfilteredPartitionIterators.MergeListener getMergeListener(L replicaLayout);
+    UnfilteredPartitionIterators.MergeListener getMergeListener(P replicaPlan);
 
     /**
      * Called when the digests from the initial read don't match. Reads may block on the
@@ -55,7 +58,7 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
      * @param digestResolver supplied so we can get the original data response
      * @param resultConsumer hook for the repair to set it's result on completion
      */
-    public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer);
+    public void startRepair(DigestResolver<E, P> digestResolver, Consumer<PartitionIterator> resultConsumer);
 
     /**
      * Block on the reads (or timeout) sent out in {@link ReadRepair#startRepair}
@@ -90,5 +93,5 @@ public interface ReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L
      * Repairs a partition _after_ receiving data responses. This method receives replica list, since
      * we will block repair only on the replicas that have responded.
      */
-    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout);
+    void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, P replicaPlan);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
index 4c74a89..b9167bd 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairDiagnostics.java
@@ -26,6 +26,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.diag.DiagnosticEventService;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.service.reads.DigestResolver;
 import org.apache.cassandra.service.reads.repair.PartitionRepairEvent.PartitionRepairEventType;
 import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType;
@@ -38,22 +39,22 @@ final class ReadRepairDiagnostics
     {
     }
 
-    static void startRepair(AbstractReadRepair readRepair, ReplicaLayout<?, ?> layout, DigestResolver digestResolver)
+    static void startRepair(AbstractReadRepair readRepair, ReplicaPlan.ForRead<?> fullPlan, DigestResolver digestResolver)
     {
         if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.START_REPAIR))
             service.publish(new ReadRepairEvent(ReadRepairEventType.START_REPAIR,
                                                 readRepair,
-                                                layout.selected().endpoints(),
-                                                layout.all().endpoints(), digestResolver));
+                                                fullPlan.contacts().endpoints(),
+                                                fullPlan.candidates().endpoints(), digestResolver));
     }
 
     static void speculatedRead(AbstractReadRepair readRepair, InetAddressAndPort endpoint,
-                               ReplicaLayout<?, ?> replicaLayout)
+                               ReplicaPlan.ForRead<?> fullPlan)
     {
         if (service.isEnabled(ReadRepairEvent.class, ReadRepairEventType.SPECULATED_READ))
             service.publish(new ReadRepairEvent(ReadRepairEventType.SPECULATED_READ,
                                                 readRepair, Collections.singletonList(endpoint),
-                                                Lists.newArrayList(replicaLayout.all().endpoints()), null));
+                                                Lists.newArrayList(fullPlan.candidates().endpoints()), null));
     }
 
     static void sendInitialRepair(BlockingPartitionRepair partitionRepair, InetAddressAndPort destination, Mutation mutation)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
index 9e14362..5cec802 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairEvent.java
@@ -67,7 +67,7 @@ final class ReadRepairEvent extends DiagnosticEvent
         this.keyspace = readRepair.cfs.keyspace;
         this.tableName = readRepair.cfs.getTableName();
         this.cqlCommand = readRepair.command.toCQLString();
-        this.consistency = readRepair.replicaLayout.consistencyLevel();
+        this.consistency = readRepair.replicaPlan().consistencyLevel();
         this.speculativeRetry = readRepair.cfs.metadata().params.speculativeRetry.kind();
         this.destinations = destinations;
         this.allEndpoints = allEndpoints;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
index 28c0e9e..7a4b795 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepairStrategy.java
@@ -21,22 +21,25 @@ package org.apache.cassandra.service.reads.repair;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 
 public enum ReadRepairStrategy implements ReadRepair.Factory
 {
     NONE
     {
-        public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+        public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
         {
-            return new ReadOnlyReadRepair<>(command, replicaLayout, queryStartNanoTime);
+            return new ReadOnlyReadRepair<>(command, replicaPlan, queryStartNanoTime);
         }
     },
 
     BLOCKING
     {
-        public <E extends Endpoints<E>, L extends ReplicaLayout<E, L>> ReadRepair<E, L> create(ReadCommand command, L replicaLayout, long queryStartNanoTime)
+        public <E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E>>
+        ReadRepair<E, P> create(ReadCommand command, ReplicaPlan.Shared<E, P> replicaPlan, long queryStartNanoTime)
         {
-            return new BlockingReadRepair<>(command, replicaLayout, queryStartNanoTime);
+            return new BlockingReadRepair<>(command, replicaPlan, queryStartNanoTime);
         }
     };
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
index 7fe797a..60e0d41 100644
--- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
+++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java
@@ -46,21 +46,21 @@ import org.apache.cassandra.db.rows.Rows;
 import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.locator.Endpoints;
 import org.apache.cassandra.locator.Replica;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.schema.ColumnMetadata;
 
-public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeListener
+public class RowIteratorMergeListener<E extends Endpoints<E>>
+        implements UnfilteredRowIterators.MergeListener
 {
     private final DecoratedKey partitionKey;
     private final RegularAndStaticColumns columns;
     private final boolean isReversed;
     private final ReadCommand command;
-    private final ConsistencyLevel consistency;
 
     private final PartitionUpdate.Builder[] repairs;
     private final Row.Builder[] currentRows;
     private final RowDiffListener diffListener;
-    private final ReplicaLayout layout;
+    private final ReplicaPlan.ForRead<E> replicaPlan;
 
     // The partition level deletion for the merge row.
     private DeletionTime partitionLevelDeletion;
@@ -73,19 +73,18 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
     private final ReadRepair readRepair;
 
-    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaLayout layout, ReadCommand command, ConsistencyLevel consistency, ReadRepair readRepair)
+    public RowIteratorMergeListener(DecoratedKey partitionKey, RegularAndStaticColumns columns, boolean isReversed, ReplicaPlan.ForRead<E> replicaPlan, ReadCommand command, ReadRepair readRepair)
     {
         this.partitionKey = partitionKey;
         this.columns = columns;
         this.isReversed = isReversed;
-        this.layout = layout;
-        int size = layout.selected().size();
+        this.replicaPlan = replicaPlan;
+        int size = replicaPlan.contacts().size();
         repairs = new PartitionUpdate.Builder[size];
         currentRows = new Row.Builder[size];
         sourceDeletionTime = new DeletionTime[size];
         markerToRepair = new ClusteringBound[size];
         this.command = command;
-        this.consistency = consistency;
         this.readRepair = readRepair;
 
         this.diffListener = new RowDiffListener()
@@ -310,7 +309,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
     public void close()
     {
         Map<Replica, Mutation> mutations = null;
-        Endpoints<?> sources = layout.selected();
+        Endpoints<?> sources = replicaPlan.contacts();
         for (int i = 0; i < repairs.length; i++)
         {
             if (repairs[i] == null)
@@ -318,7 +317,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
             Replica source = sources.get(i);
 
-            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), consistency, source.endpoint(), false);
+            Mutation mutation = BlockingReadRepairs.createRepairMutation(repairs[i].build(), replicaPlan.consistencyLevel(), source.endpoint(), false);
             if (mutation == null)
                 continue;
 
@@ -330,7 +329,7 @@ public class RowIteratorMergeListener implements UnfilteredRowIterators.MergeLis
 
         if (mutations != null)
         {
-            readRepair.repairPartition(partitionKey, mutations, layout);
+            readRepair.repairPartition(partitionKey, mutations, replicaPlan);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
index 66eff23..f937f96 100644
--- a/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
+++ b/test/unit/org/apache/cassandra/locator/ReplicaCollectionTest.java
@@ -22,7 +22,6 @@ import com.google.common.base.Predicates;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -34,13 +33,14 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.net.UnknownHostException;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import static com.google.common.collect.Iterables.*;
+import static com.google.common.collect.Iterables.filter;
 import static org.apache.cassandra.locator.Replica.fullReplica;
 import static org.apache.cassandra.locator.Replica.transientReplica;
 
@@ -111,7 +111,7 @@ public class ReplicaCollectionTest
 
         void testEquals()
         {
-            Assert.assertTrue(Iterables.elementsEqual(canonicalList, test));
+            Assert.assertTrue(elementsEqual(canonicalList, test));
         }
 
         void testEndpoints()
@@ -144,28 +144,6 @@ public class ReplicaCollectionTest
             Assert.assertEquals(new LinkedHashSet<>(Lists.transform(canonicalList, Replica::endpoint)), test.endpoints());
         }
 
-        void testSelect(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
-        {
-            TestCase<C> allMatchZeroCapacity = new TestCase<>(test.select().add(Predicates.alwaysTrue(), 0).get(), Collections.emptyList());
-            allMatchZeroCapacity.testAll(subListDepth, filterDepth, sortDepth, selectDepth - 1);
-
-            TestCase<C> noMatchFullCapacity = new TestCase<>(test.select().add(Predicates.alwaysFalse(), canonicalList.size()).get(), Collections.emptyList());
-            noMatchFullCapacity.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
-
-            if (canonicalList.size() <= 2)
-                return;
-
-            List<Replica> newOrderList = ImmutableList.of(canonicalList.get(2), canonicalList.get(1), canonicalList.get(0));
-            TestCase<C> newOrder = new TestCase<>(
-                    test.select()
-                            .add(r -> r == newOrderList.get(0), 3)
-                            .add(r -> r == newOrderList.get(1), 3)
-                            .add(r -> r == newOrderList.get(2), 3)
-                            .get(), newOrderList
-            );
-            newOrder.testAll(subListDepth, filterDepth, sortDepth,selectDepth - 1);
-        }
-
         private void assertSubList(C subCollection, int from, int to)
         {
             Assert.assertTrue(subCollection.isSnapshot);
@@ -182,7 +160,7 @@ public class ReplicaCollectionTest
             }
         }
 
-        void testSubList(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        void testSubList(int subListDepth, int filterDepth, int sortDepth)
         {
             if (test.isSnapshot)
                 Assert.assertSame(test, test.subList(0, test.size()));
@@ -192,34 +170,62 @@ public class ReplicaCollectionTest
 
             TestCase<C> skipFront = new TestCase<>(test.subList(1, test.size()), canonicalList.subList(1, canonicalList.size()));
             assertSubList(skipFront.test, 1, canonicalList.size());
-            skipFront.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+            skipFront.testAll(subListDepth - 1, filterDepth, sortDepth);
             TestCase<C> skipBack = new TestCase<>(test.subList(0, test.size() - 1), canonicalList.subList(0, canonicalList.size() - 1));
             assertSubList(skipBack.test, 0, canonicalList.size() - 1);
-            skipBack.testAll(subListDepth - 1, filterDepth, sortDepth, selectDepth);
+            skipBack.testAll(subListDepth - 1, filterDepth, sortDepth);
         }
 
-        void testFilter(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        void testFilter(int subListDepth, int filterDepth, int sortDepth)
         {
             if (test.isSnapshot)
                 Assert.assertSame(test, test.filter(Predicates.alwaysTrue()));
 
             if (test.isEmpty())
                 return;
+
             // remove start
             // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
-            assertSubList(test.filter(r -> r != canonicalList.get(0)), 1, canonicalList.size());
+            {
+                Predicate<Replica> removeFirst = r -> r != canonicalList.get(0);
+                assertSubList(test.filter(removeFirst), 1, canonicalList.size());
+                assertSubList(test.filter(removeFirst, 1), 1, Math.min(canonicalList.size(), 2));
+            }
 
             if (test.size() <= 1)
                 return;
+
             // remove end
             // we recurse on the same subset in testSubList, so just corroborate we have the correct list here
-            assertSubList(test.filter(r -> r != canonicalList.get(canonicalList.size() - 1)), 0, canonicalList.size() - 1);
+            {
+                int last = canonicalList.size() - 1;
+                Predicate<Replica> removeLast = r -> r != canonicalList.get(last);
+                assertSubList(test.filter(removeLast), 0, last);
+            }
 
             if (test.size() <= 2)
                 return;
+
             Predicate<Replica> removeMiddle = r -> r != canonicalList.get(canonicalList.size() / 2);
-            TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(Iterables.filter(canonicalList, removeMiddle::test)));
-            filtered.testAll(subListDepth, filterDepth - 1, sortDepth, selectDepth);
+            TestCase<C> filtered = new TestCase<>(test.filter(removeMiddle), ImmutableList.copyOf(filter(canonicalList, removeMiddle::test)));
+            filtered.testAll(subListDepth, filterDepth - 1, sortDepth);
+        }
+
+        void testCount()
+        {
+            Assert.assertEquals(0, test.count(Predicates.alwaysFalse()));
+
+            if (test.isEmpty())
+            {
+                Assert.assertEquals(0, test.count(Predicates.alwaysTrue()));
+                return;
+            }
+
+            for (int i = 0 ; i < canonicalList.size() ; ++i)
+            {
+                Replica discount = canonicalList.get(i);
+                Assert.assertEquals(canonicalList.size() - 1, test.count(r -> r != discount));
+            }
         }
 
         void testContains()
@@ -235,7 +241,7 @@ public class ReplicaCollectionTest
                 Assert.assertEquals(canonicalList.get(i), test.get(i));
         }
 
-        void testSort(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        void testSort(int subListDepth, int filterDepth, int sortDepth)
         {
             final Comparator<Replica> comparator = (o1, o2) ->
             {
@@ -244,10 +250,10 @@ public class ReplicaCollectionTest
                 return f1 == f2 ? 0 : f1 ? 1 : -1;
             };
             TestCase<C> sorted = new TestCase<>(test.sorted(comparator), ImmutableList.sortedCopyOf(comparator, canonicalList));
-            sorted.testAll(subListDepth, filterDepth, sortDepth - 1, selectDepth);
+            sorted.testAll(subListDepth, filterDepth, sortDepth - 1);
         }
 
-        private void testAll(int subListDepth, int filterDepth, int sortDepth, int selectDepth)
+        private void testAll(int subListDepth, int filterDepth, int sortDepth)
         {
             testEndpoints();
             testOrderOfIteration();
@@ -255,19 +261,18 @@ public class ReplicaCollectionTest
             testGet();
             testEquals();
             testSize();
+            testCount();
             if (subListDepth > 0)
-                testSubList(subListDepth, filterDepth, sortDepth, selectDepth);
+                testSubList(subListDepth, filterDepth, sortDepth);
             if (filterDepth > 0)
-                testFilter(subListDepth, filterDepth, sortDepth, selectDepth);
+                testFilter(subListDepth, filterDepth, sortDepth);
             if (sortDepth > 0)
-                testSort(subListDepth, filterDepth, sortDepth, selectDepth);
-            if (selectDepth > 0)
-                testSelect(subListDepth, filterDepth, sortDepth, selectDepth);
+                testSort(subListDepth, filterDepth, sortDepth);
         }
 
         public void testAll()
         {
-            testAll(2, 2, 2, 2);
+            testAll(2, 2, 2);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org