You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by be...@apache.org on 2018/09/14 09:14:53 UTC

[3/4] cassandra git commit: ReplicaPlan/Layout refactor follow-up/completion

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/ReplicaPlans.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
new file mode 100644
index 0000000..25f42c3
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import static com.google.common.collect.Iterables.any;
+import static com.google.common.collect.Iterables.filter;
+import static com.google.common.collect.Iterables.limit;
+
+public class ReplicaPlans
+{
+
+    /**
+     * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive.
+     */
+    public static ReplicaPlan.ForTokenWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica)
+    {
+        EndpointsForToken one = EndpointsForToken.of(token, replica);
+        EndpointsForToken empty = EndpointsForToken.empty(token);
+        return new ReplicaPlan.ForTokenWrite(keyspace, ConsistencyLevel.ONE, empty, one, one, one);
+    }
+
+    /**
+     * A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator
+     * (if it is not itself an owner)
+     */
+    public static ReplicaPlan.ForTokenWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica)
+    {
+        return forSingleReplicaWrite(keyspace, token, replica);
+    }
+
+    /**
+     * Requires that the provided endpoints are alive.  Converts them to their relevant system replicas.
+     * Note that the liveAndDown collection and live are equal to the provided endpoints.
+     *
+     * The semantics are a bit weird, in that CL=ONE iff we have one node provided, and otherwise is equal to TWO.
+     * How these CL were chosen, and why we drop the CL if only one live node is available, are both unclear.
+     */
+    public static ReplicaPlan.ForTokenWrite forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+    {
+        // A single case we write not for range or token, but multiple mutations to many tokens
+        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(
+                SystemReplicas.getSystemReplicas(endpoints).forToken(token),
+                EndpointsForToken.empty(token)
+        );
+        ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+        // assume that we have already been given live endpoints, and skip applying the failure detector
+        return forWrite(keyspace, consistencyLevel, liveAndDown, liveAndDown, writeAll);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector);
+    }
+
+    @VisibleForTesting
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(natural, pending), isAlive, selector);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector);
+    }
+
+    @VisibleForTesting
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException
+    {
+        ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive);
+        return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector);
+    }
+
+    public static ReplicaPlan.ForTokenWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException
+    {
+        EndpointsForToken contacts = selector.select(keyspace, consistencyLevel, liveAndDown, live);
+        ReplicaPlan.ForTokenWrite result = new ReplicaPlan.ForTokenWrite(keyspace, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts);
+        result.assureSufficientReplicas();
+        return result;
+    }
+
+    public interface Selector
+    {
+        <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live);
+    }
+
+    /**
+     * Select all nodes, transient or otherwise, as targets for the operation.
+     *
+     * This is may no longer be useful until we finish implementing transient replication support, however
+     * it can be of value to stipulate that a location writes to all nodes without regard to transient status.
+     */
+    public static final Selector writeAll = new Selector()
+    {
+        @Override
+        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+        {
+            return liveAndDown.all();
+        }
+    };
+
+    /**
+     * Select all full nodes, live or down, as write targets.  If there are insufficient nodes to complete the write,
+     * but there are live transient nodes, select a sufficient number of these to reach our consistency level.
+     *
+     * Pending nodes are always contacted, whether or not they are full.  When a transient replica is undergoing
+     * a pending move to a new node, if we write (transiently) to it, this write would not be replicated to the
+     * pending transient node, and so when completing the move, the write could effectively have not reached the
+     * promised consistency level.
+     */
+    public static final Selector writeNormal = new Selector()
+    {
+        @Override
+        public <E extends Endpoints<E>, L extends ReplicaLayout.ForWrite<E>>
+        E select(Keyspace keyspace, ConsistencyLevel consistencyLevel, L liveAndDown, L live)
+        {
+            if (!any(liveAndDown.all(), Replica::isTransient))
+                return liveAndDown.all();
+
+            assert consistencyLevel != ConsistencyLevel.EACH_QUORUM;
+
+            ReplicaCollection.Mutable<E> contacts = liveAndDown.all().newMutable(liveAndDown.all().size());
+            contacts.addAll(filter(liveAndDown.natural(), Replica::isFull));
+            contacts.addAll(liveAndDown.pending());
+
+            // TODO: this doesn't correctly handle LOCAL_QUORUM (or EACH_QUORUM at all)
+            int liveCount = contacts.count(live.all()::contains);
+            int requiredTransientCount = consistencyLevel.blockForWrite(keyspace, liveAndDown.pending()) - liveCount;
+            if (requiredTransientCount > 0)
+                contacts.addAll(limit(filter(live.natural(), Replica::isTransient), requiredTransientCount));
+            return contacts.asSnapshot();
+        }
+    };
+
+    /**
+     * Construct the plan for a paxos round - NOT the write or read consistency level for either the write or comparison,
+     * but for the paxos linearisation agreement.
+     *
+     * This will select all live nodes as the candidates for the operation.  Only the required number of participants
+     */
+    public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+    {
+        Token tk = key.getToken();
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk);
+
+        Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547
+
+        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+        {
+            // TODO: we should cleanup our semantics here, as we're filtering ALL nodes to localDC which is unexpected for ReplicaPlan
+            // Restrict natural and pending to node in the local DC only
+            String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
+
+            liveAndDown = liveAndDown.filter(isLocalDc);
+        }
+
+        ReplicaLayout.ForTokenWrite live = liveAndDown.filter(FailureDetector.isReplicaAlive);
+
+        // TODO: this should use assureSufficientReplicas
+        int participants = liveAndDown.all().size();
+        int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
+
+        EndpointsForToken contacts = live.all();
+        if (contacts.size() < requiredParticipants)
+            throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size());
+
+        // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
+        // Note that we fake an impossible number of required nodes in the unavailable exception
+        // to nail home the point that it's an impossible operation no matter how many nodes are live.
+        if (liveAndDown.pending().size() > 1)
+            throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()),
+                    consistencyForPaxos,
+                    participants + 1,
+                    contacts.size());
+
+        return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants);
+    }
+
+    /**
+     * Construct a plan for reading from a single node - this permits no speculation or read-repair
+     */
+    public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica)
+    {
+        EndpointsForToken one = EndpointsForToken.of(token, replica);
+        return new ReplicaPlan.ForTokenRead(keyspace, ConsistencyLevel.ONE, one, one);
+    }
+
+    /**
+     * Construct a plan for reading from a single node - this permits no speculation or read-repair
+     */
+    public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
+    {
+        // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class
+        EndpointsForRange one = EndpointsForRange.of(replica);
+        return new ReplicaPlan.ForRangeRead(keyspace, ConsistencyLevel.ONE, range, one, one);
+    }
+
+    /**
+     * Construct a plan for reading the provided token at the provided consistency level.  This translates to a collection of
+     *   - candidates who are: alive, replicate the token, and are sorted by their snitch scores
+     *   - contacts who are: the first blockFor + (retry == ALWAYS ? 1 : 0) candidates
+     *
+     * The candidate collection can be used for speculation, although at present it would break
+     * LOCAL_QUORUM and EACH_QUORUM to do so without further filtering
+     */
+    public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+    {
+        ReplicaLayout.ForTokenRead candidates = ReplicaLayout.forTokenReadLiveSorted(keyspace, token);
+        EndpointsForToken contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural(),
+                retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+
+        ReplicaPlan.ForTokenRead result = new ReplicaPlan.ForTokenRead(keyspace, consistencyLevel, candidates.natural(), contacts);
+        result.assureSufficientReplicas(); // Throw UAE early if we don't have enough replicas.
+        return result;
+    }
+
+    /**
+     * Construct a plan for reading the provided range at the provided consistency level.  This translates to a collection of
+     *   - candidates who are: alive, replicate the range, and are sorted by their snitch scores
+     *   - contacts who are: the first blockFor candidates
+     *
+     * There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query.
+     */
+    public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range)
+    {
+        ReplicaLayout.ForRangeRead candidates = ReplicaLayout.forRangeReadLiveSorted(keyspace, range);
+        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, candidates.natural());
+
+        ReplicaPlan.ForRangeRead result = new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, range, candidates.natural(), contacts);
+        result.assureSufficientReplicas();
+        return result;
+    }
+
+    /**
+     * Take two range read plans for adjacent ranges, and check if it is OK (and worthwhile) to combine them into a single plan
+     */
+    public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right)
+    {
+        // TODO: should we be asserting that the ranges are adjacent?
+        AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right);
+        EndpointsForRange mergedCandidates = left.candidates().keep(right.candidates().endpoints());
+
+        // Check if there are enough shared endpoints for the merge to be possible.
+        if (!consistencyLevel.isSufficientLiveReplicasForRead(keyspace, mergedCandidates))
+            return null;
+
+        EndpointsForRange contacts = consistencyLevel.filterForQuery(keyspace, mergedCandidates);
+
+        // Estimate whether merging will be a win or not
+        if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts()))
+            return null;
+
+        // If we get there, merge this range and the next one
+        return new ReplicaPlan.ForRangeRead(keyspace, consistencyLevel, newRange, mergedCandidates, contacts);
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/SystemReplicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SystemReplicas.java b/src/java/org/apache/cassandra/locator/SystemReplicas.java
index 13a9d74..0d1fc8d 100644
--- a/src/java/org/apache/cassandra/locator/SystemReplicas.java
+++ b/src/java/org/apache/cassandra/locator/SystemReplicas.java
@@ -18,12 +18,11 @@
 
 package org.apache.cassandra.locator;
 
-import java.util.ArrayList;
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.Collections2;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -50,13 +49,8 @@ public class SystemReplicas
         return systemReplicas.computeIfAbsent(endpoint, SystemReplicas::createSystemReplica);
     }
 
-    public static Collection<Replica> getSystemReplicas(Collection<InetAddressAndPort> endpoints)
+    public static EndpointsForRange getSystemReplicas(Collection<InetAddressAndPort> endpoints)
     {
-        List<Replica> replicas = new ArrayList<>(endpoints.size());
-        for (InetAddressAndPort endpoint: endpoints)
-        {
-            replicas.add(getSystemReplica(endpoint));
-        }
-        return replicas;
+        return EndpointsForRange.copyOf(Collections2.transform(endpoints, SystemReplicas::getSystemReplica));
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 4ab34db..ad40d7b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -1224,15 +1224,11 @@ public class TokenMetadata
     /**
      * @deprecated retained for benefit of old tests
      */
+    @Deprecated
     public EndpointsForToken getWriteEndpoints(Token token, String keyspaceName, EndpointsForToken natural)
     {
         EndpointsForToken pending = pendingEndpointsForToken(token, keyspaceName);
-        if (Endpoints.haveConflicts(natural, pending))
-        {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
-        }
-        return Endpoints.concat(natural, pending);
+        return ReplicaLayout.forTokenWrite(natural, pending).all();
     }
 
     /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/net/IAsyncCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/IAsyncCallback.java b/src/java/org/apache/cassandra/net/IAsyncCallback.java
index 253b412..ceaf072 100644
--- a/src/java/org/apache/cassandra/net/IAsyncCallback.java
+++ b/src/java/org/apache/cassandra/net/IAsyncCallback.java
@@ -17,12 +17,6 @@
  */
 package org.apache.cassandra.net;
 
-import com.google.common.base.Predicate;
-
-import org.apache.cassandra.gms.FailureDetector;
-import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.Replica;
-
 /**
  * implementors of IAsyncCallback need to make sure that any public methods
  * are threadsafe with respect to response() being called from the message
@@ -31,10 +25,6 @@ import org.apache.cassandra.locator.Replica;
  */
 public interface IAsyncCallback<T>
 {
-    final Predicate<InetAddressAndPort> isAlive = FailureDetector.instance::isAlive;
-
-    final Predicate<Replica> isReplicaAlive = replica -> isAlive.apply(replica.endpoint());
-
     /**
      * @param msg response received.
      */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e817cc8..7f51ae7 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -26,8 +26,9 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.stream.Collectors;
 
 import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.locator.ReplicaLayout;
 
+import org.apache.cassandra.locator.EndpointsForToken;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,6 @@ import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.exceptions.RequestFailureReason;
-import org.apache.cassandra.exceptions.UnavailableException;
 import org.apache.cassandra.exceptions.WriteFailureException;
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
@@ -53,7 +53,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     //Count down until all responses and expirations have occured before deciding whether the ideal CL was reached.
     private AtomicInteger responsesAndExpirations;
     private final SimpleCondition condition = new SimpleCondition();
-    protected final ReplicaLayout.ForToken replicaLayout;
+    protected final ReplicaPlan.ForTokenWrite replicaPlan;
 
     protected final Runnable callback;
     protected final WriteType writeType;
@@ -76,12 +76,12 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      * @param callback           A callback to be called when the write is successful.
      * @param queryStartNanoTime
      */
-    protected AbstractWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    protected AbstractWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                            Runnable callback,
                                            WriteType writeType,
                                            long queryStartNanoTime)
     {
-        this.replicaLayout = replicaLayout;
+        this.replicaPlan = replicaPlan;
         this.callback = callback;
         this.writeType = writeType;
         this.failureReasonByEndpoint = new ConcurrentHashMap<>();
@@ -104,19 +104,19 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
 
         if (!success)
         {
-            int blockedFor = totalBlockFor();
+            int blockedFor = blockFor();
             int acks = ackCount();
             // It's pretty unlikely, but we can race between exiting await above and here, so
             // that we could now have enough acks. In that case, we "lie" on the acks count to
             // avoid sending confusing info to the user (see CASSANDRA-6491).
             if (acks >= blockedFor)
                 acks = blockedFor - 1;
-            throw new WriteTimeoutException(writeType, replicaLayout.consistencyLevel(), acks, blockedFor);
+            throw new WriteTimeoutException(writeType, replicaPlan.consistencyLevel(), acks, blockedFor);
         }
 
-        if (totalBlockFor() + failures > totalEndpoints())
+        if (blockFor() + failures > candidateReplicaCount())
         {
-            throw new WriteFailureException(replicaLayout.consistencyLevel(), ackCount(), totalBlockFor(), writeType, failureReasonByEndpoint);
+            throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, failureReasonByEndpoint);
         }
     }
 
@@ -135,7 +135,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     public void setIdealCLResponseHandler(AbstractWriteResponseHandler handler)
     {
         this.idealCLDelegate = handler;
-        idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaLayout.selected().size());
+        idealCLDelegate.responsesAndExpirations = new AtomicInteger(replicaPlan.contacts().size());
     }
 
     /**
@@ -189,28 +189,30 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     /**
      * @return the minimum number of endpoints that must reply.
      */
-    protected int totalBlockFor()
+    protected int blockFor()
     {
         // During bootstrap, we have to include the pending endpoints or we may fail the consistency level
         // guarantees (see #833)
-        return replicaLayout.consistencyLevel().blockForWrite(replicaLayout.keyspace(), replicaLayout.pending());
+        return replicaPlan.blockFor();
     }
 
     /**
+     * TODO: this method is brittle for its purpose of deciding when we should fail a query;
+     *       this needs to be CL aware, and of which nodes are live/down
      * @return the total number of endpoints the request can been sent to.
      */
-    protected int totalEndpoints()
+    protected int candidateReplicaCount()
     {
-        return replicaLayout.all().size();
+        return replicaPlan.liveAndDown().size();
     }
 
     public ConsistencyLevel consistencyLevel()
     {
-        return replicaLayout.consistencyLevel();
+        return replicaPlan.consistencyLevel();
     }
 
     /**
-     * @return true if the message counts towards the totalBlockFor() threshold
+     * @return true if the message counts towards the blockFor() threshold
      */
     protected boolean waitingFor(InetAddressAndPort from)
     {
@@ -227,11 +229,6 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      */
     public abstract void response(MessageIn<T> msg);
 
-    public void assureSufficientLiveNodes() throws UnavailableException
-    {
-        replicaLayout.consistencyLevel().assureSufficientLiveNodesForWrite(replicaLayout.keyspace(), replicaLayout.all().filter(isReplicaAlive), replicaLayout.pending());
-    }
-
     protected void signal()
     {
         condition.signalAll();
@@ -250,7 +247,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
 
         failureReasonByEndpoint.put(from, failureReason);
 
-        if (totalBlockFor() + n > totalEndpoints())
+        if (blockFor() + n > candidateReplicaCount())
             signal();
     }
 
@@ -278,11 +275,11 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
             //The condition being signaled is a valid proxy for the CL being achieved
             if (!condition.isSignaled())
             {
-                replicaLayout.keyspace().metric.writeFailedIdealCL.inc();
+                replicaPlan.keyspace().metric.writeFailedIdealCL.inc();
             }
             else
             {
-                replicaLayout.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
+                replicaPlan.keyspace().metric.idealCLWriteLatency.addNano(System.nanoTime() - queryStartNanoTime);
             }
         }
     }
@@ -292,7 +289,8 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
      */
     public void maybeTryAdditionalReplicas(IMutation mutation, StorageProxy.WritePerformer writePerformer, String localDC)
     {
-        if (replicaLayout.all().size() == replicaLayout.selected().size())
+        EndpointsForToken uncontacted = replicaPlan.liveUncontacted();
+        if (uncontacted.isEmpty())
             return;
 
         long timeout = Long.MAX_VALUE;
@@ -313,7 +311,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
                 for (ColumnFamilyStore cf : cfs)
                     cf.metric.speculativeWrites.inc();
 
-                writePerformer.apply(mutation, replicaLayout.forNaturalUncontacted(),
+                writePerformer.apply(mutation, replicaPlan.withContact(uncontacted),
                                      (AbstractWriteResponseHandler<IMutation>) this,
                                      localDC);
             }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 8ffca6a..b32f67e 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -331,7 +331,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
             Multimap<String, InetAddressAndPort> dcEndpointsMap = topology.getDatacenterEndpoints();
             Iterable<InetAddressAndPort> dcEndpoints = concat(transform(dataCenters, dcEndpointsMap::get));
-            return neighbors.keep(dcEndpoints);
+            return neighbors.select(dcEndpoints, true);
         }
         else if (hosts != null && !hosts.isEmpty())
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ee74df5..63fbc72 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -36,7 +36,7 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
 
     public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
     {
-        super(wrapped.replicaLayout, wrapped.callback, wrapped.writeType, queryStartNanoTime);
+        super(wrapped.replicaPlan, wrapped.callback, wrapped.writeType, queryStartNanoTime);
         this.wrapped = wrapped;
         this.requiredBeforeFinish = requiredBeforeFinish;
         this.cleanup = cleanup;
@@ -64,24 +64,19 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
         wrapped.onFailure(from, failureReason);
     }
 
-    public void assureSufficientLiveNodes()
-    {
-        wrapped.assureSufficientLiveNodes();
-    }
-
     public void get() throws WriteTimeoutException, WriteFailureException
     {
         wrapped.get();
     }
 
-    protected int totalBlockFor()
+    protected int blockFor()
     {
-        return wrapped.totalBlockFor();
+        return wrapped.blockFor();
     }
 
-    protected int totalEndpoints()
+    protected int candidateReplicaCount()
     {
-        return wrapped.totalEndpoints();
+        return wrapped.candidateReplicaCount();
     }
 
     protected boolean waitingFor(InetAddressAndPort from)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index d4cdcc6..4c892ff 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -22,10 +22,10 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.locator.ReplicaLayout;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.NetworkTopologyStrategy;
 import org.apache.cassandra.locator.Replica;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.db.ConsistencyLevel;
 import org.apache.cassandra.db.WriteType;
@@ -40,16 +40,16 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
     private final Map<String, AtomicInteger> responses = new HashMap<String, AtomicInteger>();
     private final AtomicInteger acks = new AtomicInteger(0);
 
-    public DatacenterSyncWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public DatacenterSyncWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                               Runnable callback,
                                               WriteType writeType,
                                               long queryStartNanoTime)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        assert replicaLayout.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        assert replicaPlan.consistencyLevel() == ConsistencyLevel.EACH_QUORUM;
 
-        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaLayout.keyspace().getReplicationStrategy();
+        NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) replicaPlan.keyspace().getReplicationStrategy();
 
         for (String dc : strategy.getDatacenters())
         {
@@ -59,7 +59,7 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
 
         // During bootstrap, we have to include the pending endpoints or we may fail the consistency level
         // guarantees (see #833)
-        for (Replica pending : replicaLayout.pending())
+        for (Replica pending : replicaPlan.pending())
         {
             responses.get(snitch.getDatacenter(pending)).incrementAndGet();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b458a71..a3ef76f 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -19,7 +19,7 @@ package org.apache.cassandra.service;
 
 import org.apache.cassandra.db.WriteType;
 import org.apache.cassandra.locator.InetAddressAndPort;
-import org.apache.cassandra.locator.ReplicaLayout;
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.apache.cassandra.net.MessageIn;
 
 /**
@@ -27,13 +27,13 @@ import org.apache.cassandra.net.MessageIn;
  */
 public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
 {
-    public DatacenterWriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public DatacenterWriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                           Runnable callback,
                                           WriteType writeType,
                                           long queryStartNanoTime)
     {
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        assert replicaLayout.consistencyLevel().isDatacenterLocal();
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        assert replicaPlan.consistencyLevel().isDatacenterLocal();
     }
 
     @Override
@@ -54,6 +54,6 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
     @Override
     protected boolean waitingFor(InetAddressAndPort from)
     {
-        return replicaLayout.consistencyLevel().isLocal(from);
+        return replicaPlan.consistencyLevel().isLocal(from);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 5eb43cf..fc49330 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -60,7 +60,6 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.ViewUtils;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.hints.Hint;
 import org.apache.cassandra.hints.HintsService;
@@ -135,7 +134,7 @@ public class StorageProxy implements StorageProxyMBean
         standardWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
         {
             assert mutation instanceof Mutation;
-            sendToHintedReplicas((Mutation) mutation, targets.selected(), responseHandler, localDataCenter, Stage.MUTATION);
+            sendToHintedReplicas((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION);
         };
 
         /*
@@ -146,17 +145,17 @@ public class StorageProxy implements StorageProxyMBean
          */
         counterWritePerformer = (mutation, targets, responseHandler, localDataCenter) ->
         {
-            EndpointsForToken selected = targets.selected().withoutSelf();
+            EndpointsForToken selected = targets.contacts().withoutSelf();
             Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
-            counterWriteTask(mutation, selected, responseHandler, localDataCenter).run();
+            counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter).run();
         };
 
         counterWriteOnCoordinatorPerformer = (mutation, targets, responseHandler, localDataCenter) ->
         {
-            EndpointsForToken selected = targets.selected().withoutSelf();
+            EndpointsForToken selected = targets.contacts().withoutSelf();
             Replicas.temporaryAssertFull(selected); // TODO CASSANDRA-14548
             StageManager.getStage(Stage.COUNTER_MUTATION)
-                        .execute(counterWriteTask(mutation, selected, responseHandler, localDataCenter));
+                        .execute(counterWriteTask(mutation, targets.withContact(selected), responseHandler, localDataCenter));
         };
 
         for(ConsistencyLevel level : ConsistencyLevel.values())
@@ -232,9 +231,9 @@ public class StorageProxy implements StorageProxyMBean
             while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the start of each attempt
-                ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
+                ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(keyspaceName), key, consistencyForPaxos);
 
-                final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaLayout, consistencyForPaxos, consistencyForCommit, true, state);
+                final PaxosBallotAndContention pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, replicaPlan, consistencyForPaxos, consistencyForCommit, true, state);
                 final UUID ballot = pair.ballot;
                 contentions += pair.contentions;
 
@@ -276,7 +275,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Commit proposal = Commit.newProposal(ballot, updates);
                 Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
-                if (proposePaxos(proposal, replicaLayout, true, queryStartNanoTime))
+                if (proposePaxos(proposal, replicaPlan, true, queryStartNanoTime))
                 {
                     commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
                     Tracing.trace("CAS successful");
@@ -334,7 +333,7 @@ public class StorageProxy implements StorageProxyMBean
     private static PaxosBallotAndContention beginAndRepairPaxos(long queryStartNanoTime,
                                                                 DecoratedKey key,
                                                                 TableMetadata metadata,
-                                                                ReplicaLayout.ForPaxos replicaLayout,
+                                                                ReplicaPlan.ForPaxosWrite paxosPlan,
                                                                 ConsistencyLevel consistencyForPaxos,
                                                                 ConsistencyLevel consistencyForCommit,
                                                                 final boolean isWrite,
@@ -360,7 +359,7 @@ public class StorageProxy implements StorageProxyMBean
             // prepare
             Tracing.trace("Preparing {}", ballot);
             Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            summary = preparePaxos(toPrepare, replicaLayout, queryStartNanoTime);
+            summary = preparePaxos(toPrepare, paxosPlan, queryStartNanoTime);
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
@@ -383,7 +382,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                     casReadMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
-                if (proposePaxos(refreshedInProgress, replicaLayout, false, queryStartNanoTime))
+                if (proposePaxos(refreshedInProgress, paxosPlan, false, queryStartNanoTime))
                 {
                     try
                     {
@@ -440,12 +439,12 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaLayout.ForPaxos replicaLayout, long queryStartNanoTime)
+    private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPaxosWrite replicaPlan, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaLayout.getRequiredParticipants(), replicaLayout.consistencyLevel(), queryStartNanoTime);
+        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
-        for (Replica replica: replicaLayout.selected())
+        for (Replica replica: replicaPlan.contacts())
         {
             if (replica.isLocal())
             {
@@ -478,12 +477,12 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, ReplicaLayout.ForPaxos replicaLayout, boolean timeoutIfPartial, long queryStartNanoTime)
+    private static boolean proposePaxos(Commit proposal, ReplicaPlan.ForPaxosWrite replicaPlan, boolean timeoutIfPartial, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new ProposeCallback(replicaLayout.selected().size(), replicaLayout.getRequiredParticipants(), !timeoutIfPartial, replicaLayout.consistencyLevel(), queryStartNanoTime);
+        ProposeCallback callback = new ProposeCallback(replicaPlan.contacts().size(), replicaPlan.requiredParticipants(), !timeoutIfPartial, replicaPlan.consistencyLevel(), queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
-        for (Replica replica : replicaLayout.selected())
+        for (Replica replica : replicaPlan.contacts())
         {
             if (replica.isLocal())
             {
@@ -518,7 +517,7 @@ public class StorageProxy implements StorageProxyMBean
             return true;
 
         if (timeoutIfPartial && !callback.isFullyRefused())
-            throw new WriteTimeoutException(WriteType.CAS, replicaLayout.consistencyLevel(), callback.getAcceptCount(), replicaLayout.getRequiredParticipants());
+            throw new WriteTimeoutException(WriteType.CAS, replicaPlan.consistencyLevel(), callback.getAcceptCount(), replicaPlan.requiredParticipants());
 
         return false;
     }
@@ -531,21 +530,22 @@ public class StorageProxy implements StorageProxyMBean
         Token tk = proposal.update.partitionKey().getToken();
 
         AbstractWriteResponseHandler<Commit> responseHandler = null;
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
+        // NOTE: this ReplicaPlan is a lie, this usage of ReplicaPlan could do with being clarified - the selected() collection is essentially (I think) never used
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeAll);
         if (shouldBlock)
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-            responseHandler = rs.getWriteResponseHandler(replicaLayout, null, WriteType.SIMPLE, queryStartNanoTime);
+            responseHandler = rs.getWriteResponseHandler(replicaPlan, null, WriteType.SIMPLE, queryStartNanoTime);
             responseHandler.setSupportsBackPressure(false);
         }
 
-        MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
-        for (Replica replica : replicaLayout.all())
+        MessageOut<Commit> message = new MessageOut<>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
+        for (Replica replica : replicaPlan.liveAndDown())
         {
             InetAddressAndPort destination = replica.endpoint();
             checkHintOverload(replica);
 
-            if (FailureDetector.instance.isAlive(destination))
+            if (replicaPlan.isAlive(replica))
             {
                 if (shouldBlock)
                 {
@@ -616,10 +616,10 @@ public class StorageProxy implements StorageProxyMBean
      * the data across to some other replica.
      *
      * @param mutations the mutations to be applied across the replicas
-     * @param consistency_level the consistency level for the operation
+     * @param consistencyLevel the consistency level for the operation
      * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
-    public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime)
+    public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -637,7 +637,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (mutation instanceof CounterMutation)
                     responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
                 else
-                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
+                    responseHandlers.add(performWrite(mutation, consistencyLevel, localDataCenter, standardWritePerformer, null, plainWriteType, queryStartNanoTime));
             }
 
             // upgrade to full quorum any failed cheap quorums
@@ -653,7 +653,7 @@ public class StorageProxy implements StorageProxyMBean
         }
         catch (WriteTimeoutException|WriteFailureException ex)
         {
-            if (consistency_level == ConsistencyLevel.ANY)
+            if (consistencyLevel == ConsistencyLevel.ANY)
             {
                 hintMutations(mutations);
             }
@@ -662,7 +662,7 @@ public class StorageProxy implements StorageProxyMBean
                 if (ex instanceof WriteFailureException)
                 {
                     writeMetrics.failures.mark();
-                    writeMetricsMap.get(consistency_level).failures.mark();
+                    writeMetricsMap.get(consistencyLevel).failures.mark();
                     WriteFailureException fe = (WriteFailureException)ex;
                     Tracing.trace("Write failure; received {} of {} required replies, failed {} requests",
                                   fe.received, fe.blockFor, fe.failureReasonByEndpoint.size());
@@ -670,7 +670,7 @@ public class StorageProxy implements StorageProxyMBean
                 else
                 {
                     writeMetrics.timeouts.mark();
-                    writeMetricsMap.get(consistency_level).timeouts.mark();
+                    writeMetricsMap.get(consistencyLevel).timeouts.mark();
                     WriteTimeoutException te = (WriteTimeoutException)ex;
                     Tracing.trace("Write timeout; received {} of {} required replies", te.received, te.blockFor);
                 }
@@ -680,14 +680,14 @@ public class StorageProxy implements StorageProxyMBean
         catch (UnavailableException e)
         {
             writeMetrics.unavailables.mark();
-            writeMetricsMap.get(consistency_level).unavailables.mark();
+            writeMetricsMap.get(consistencyLevel).unavailables.mark();
             Tracing.trace("Unavailable");
             throw e;
         }
         catch (OverloadedException e)
         {
             writeMetrics.unavailables.mark();
-            writeMetricsMap.get(consistency_level).unavailables.mark();
+            writeMetricsMap.get(consistencyLevel).unavailables.mark();
             Tracing.trace("Overloaded");
             throw e;
         }
@@ -695,7 +695,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             long latency = System.nanoTime() - startTime;
             writeMetrics.addNano(latency);
-            writeMetricsMap.get(consistency_level).addNano(latency);
+            writeMetricsMap.get(consistencyLevel).addNano(latency);
             updateCoordinatorWriteLatencyTableMetric(mutations, latency);
         }
     }
@@ -725,7 +725,8 @@ public class StorageProxy implements StorageProxyMBean
 
         // local writes can timeout, but cannot be dropped (see LocalMutationRunnable and CASSANDRA-6510),
         // so there is no need to hint or retry.
-        EndpointsForToken replicasToHint = StorageService.instance.getNaturalAndPendingReplicasForToken(keyspaceName, token)
+        EndpointsForToken replicasToHint = ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+                .all()
                 .filter(StorageProxy::shouldHint);
 
         submitHint(mutation, replicasToHint, null);
@@ -737,8 +738,8 @@ public class StorageProxy implements StorageProxyMBean
         Token token = mutation.key().getToken();
         InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort();
 
-        return StorageService.instance.getNaturalReplicasForToken(keyspaceName, token).endpoints().contains(local)
-               || StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspaceName).endpoints().contains(local);
+        return ReplicaLayout.forTokenWriteLiveAndDown(Keyspace.open(keyspaceName), token)
+                .all().endpoints().contains(local);
     }
 
     /**
@@ -934,7 +935,6 @@ public class StorageProxy implements StorageProxyMBean
                                                                                cleanup,
                                                                                queryStartNanoTime);
                 // exit early if we can't fulfill the CL at this time.
-                wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
             }
 
@@ -1002,14 +1002,14 @@ public class StorageProxy implements StorageProxyMBean
     throws WriteTimeoutException, WriteFailureException
     {
         Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME);
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forBatchlogWrite(systemKeypsace, endpoints);
-        WriteResponseHandler<?> handler = new WriteResponseHandler(replicaLayout,
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forBatchlogWrite(systemKeypsace, endpoints);
+        WriteResponseHandler<?> handler = new WriteResponseHandler(replicaPlan,
                                                                    WriteType.BATCH_LOG,
                                                                    queryStartNanoTime);
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
         MessageOut<Batch> message = new MessageOut<>(MessagingService.Verb.BATCH_STORE, batch, Batch.serializer);
-        for (Replica replica : replicaLayout.all())
+        for (Replica replica : replicaPlan.liveAndDown())
         {
             logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
 
@@ -1040,12 +1040,12 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all());  // TODO: CASSANDRA-14549
-            ReplicaLayout.ForToken replicas = wrapper.handler.replicaLayout.withSelected(wrapper.handler.replicaLayout.all());
+            Replicas.temporaryAssertFull(wrapper.handler.replicaPlan.liveAndDown());  // TODO: CASSANDRA-14549
+            ReplicaPlan.ForTokenWrite replicas = wrapper.handler.replicaPlan.withContact(wrapper.handler.replicaPlan.liveAndDown());
 
             try
             {
-                sendToHintedReplicas(wrapper.mutation, replicas.selected(), wrapper.handler, localDataCenter, stage);
+                sendToHintedReplicas(wrapper.mutation, replicas, wrapper.handler, localDataCenter, stage);
             }
             catch (OverloadedException | WriteTimeoutException e)
             {
@@ -1059,11 +1059,11 @@ public class StorageProxy implements StorageProxyMBean
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
-            Replicas.temporaryAssertFull(wrapper.handler.replicaLayout.all()); // TODO: CASSANDRA-14549
-            sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaLayout.all(), wrapper.handler, localDataCenter, stage);
+            EndpointsForToken sendTo = wrapper.handler.replicaPlan.liveAndDown();
+            Replicas.temporaryAssertFull(sendTo); // TODO: CASSANDRA-14549
+            sendToHintedReplicas(wrapper.mutation, wrapper.handler.replicaPlan.withContact(sendTo), wrapper.handler, localDataCenter, stage);
         }
 
-
         for (WriteResponseHandlerWrapper wrapper : wrappers)
             wrapper.handler.get();
     }
@@ -1096,13 +1096,10 @@ public class StorageProxy implements StorageProxyMBean
 
         Token tk = mutation.key().getToken();
 
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk);
-        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(replicaPlan, callback, writeType, queryStartNanoTime);
 
-        // exit early if we can't fulfill the CL at this time
-        responseHandler.assureSufficientLiveNodes();
-
-        performer.apply(mutation, replicaLayout, responseHandler, localDataCenter);
+        performer.apply(mutation, replicaPlan, responseHandler, localDataCenter);
         return responseHandler;
     }
 
@@ -1118,8 +1115,8 @@ public class StorageProxy implements StorageProxyMBean
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
         Token tk = mutation.key().getToken();
 
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWrite(keyspace, consistencyLevel, tk, FailureDetector.instance::isAlive);
-        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout,null, writeType, queryStartNanoTime);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, tk, ReplicaPlans.writeNormal);
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan,null, writeType, queryStartNanoTime);
         BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
@@ -1140,10 +1137,11 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        Token tk = mutation.key().getToken();
-        ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, consistencyLevel, tk, naturalEndpoints, pendingEndpoints);
 
-        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaLayout, () -> {
+        ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite(naturalEndpoints, pendingEndpoints);
+        ReplicaPlan.ForTokenWrite replicaPlan = ReplicaPlans.forWrite(keyspace, consistencyLevel, liveAndDown, ReplicaPlans.writeAll);
+
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(replicaPlan, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
         }, writeType, queryStartNanoTime);
@@ -1208,7 +1206,7 @@ public class StorageProxy implements StorageProxyMBean
      * @throws OverloadedException if the hints cannot be written/enqueued
      */
     public static void sendToHintedReplicas(final Mutation mutation,
-                                            EndpointsForToken targets,
+                                            ReplicaPlan.ForTokenWrite plan,
                                             AbstractWriteResponseHandler<IMutation> responseHandler,
                                             String localDataCenter,
                                             Stage stage)
@@ -1227,11 +1225,11 @@ public class StorageProxy implements StorageProxyMBean
 
         List<InetAddressAndPort> backPressureHosts = null;
 
-        for (Replica destination : targets)
+        for (Replica destination : plan.contacts())
         {
             checkHintOverload(destination);
 
-            if (FailureDetector.instance.isAlive(destination.endpoint()))
+            if (plan.isAlive(destination))
             {
                 if (destination.isLocal())
                 {
@@ -1251,7 +1249,7 @@ public class StorageProxy implements StorageProxyMBean
                     if (localDataCenter.equals(dc))
                     {
                         if (localDc == null)
-                            localDc = new ArrayList<>(targets.size());
+                            localDc = new ArrayList<>(plan.contacts().size());
 
                         localDc.add(destination);
                     }
@@ -1268,7 +1266,7 @@ public class StorageProxy implements StorageProxyMBean
                     }
 
                     if (backPressureHosts == null)
-                        backPressureHosts = new ArrayList<>(targets.size());
+                        backPressureHosts = new ArrayList<>(plan.contacts().size());
 
                     backPressureHosts.add(destination.endpoint());
                 }
@@ -1345,7 +1343,7 @@ public class StorageProxy implements StorageProxyMBean
                                                                   message,
                                                                   destination,
                                                                   message.getTimeout(),
-                                                                  handler.replicaLayout.consistencyLevel(),
+                                                                  handler.replicaPlan.consistencyLevel(),
                                                                   true);
             messageIds[idIdx++] = id;
             logger.trace("Adding FWD message to {}@{}", id, destination);
@@ -1435,14 +1433,13 @@ public class StorageProxy implements StorageProxyMBean
             // Exit now if we can't fulfill the CL here instead of forwarding to the leader replica
             String keyspaceName = cm.getKeyspaceName();
             Keyspace keyspace = Keyspace.open(keyspaceName);
-            AbstractReplicationStrategy rs = Keyspace.open(keyspaceName).getReplicationStrategy();
             Token tk = cm.key().getToken();
 
-            ReplicaLayout.ForToken replicaLayout = ReplicaLayout.forWriteWithDownNodes(keyspace, cm.consistency(), tk);
-            rs.getWriteResponseHandler(replicaLayout, null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes();
+            ReplicaPlans.forWrite(keyspace, cm.consistency(), tk, ReplicaPlans.writeAll)
+                    .assureSufficientReplicas();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaLayout.forCounterWrite(keyspace, tk, replica),
+            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(ReplicaPlans.forForwardingCounterWrite(keyspace, tk, replica),
                                                                                                  WriteType.COUNTER, queryStartNanoTime);
 
             Tracing.trace("Enqueuing counter update to {}", replica);
@@ -1465,7 +1462,7 @@ public class StorageProxy implements StorageProxyMBean
     {
         Keyspace keyspace = Keyspace.open(keyspaceName);
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
-        EndpointsForToken replicas = StorageService.instance.getLiveNaturalReplicasForToken(keyspace, key);
+        EndpointsForToken replicas = keyspace.getReplicationStrategy().getNaturalReplicasForToken(key);
 
         // CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping
         replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint()));
@@ -1511,7 +1508,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
-                                             final EndpointsForToken targets,
+                                             final ReplicaPlan.ForTokenWrite replicaPlan,
                                              final AbstractWriteResponseHandler<IMutation> responseHandler,
                                              final String localDataCenter)
     {
@@ -1524,7 +1521,7 @@ public class StorageProxy implements StorageProxyMBean
 
                 Mutation result = ((CounterMutation) mutation).applyCounterMutation();
                 responseHandler.response(null);
-                sendToHintedReplicas(result, targets, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
+                sendToHintedReplicas(result, replicaPlan, responseHandler, localDataCenter, Stage.COUNTER_MUTATION);
             }
         };
     }
@@ -1592,7 +1589,7 @@ public class StorageProxy implements StorageProxyMBean
         try
         {
             // make sure any in-progress paxos writes are done (i.e., committed to a majority of replicas), before performing a quorum read
-            ReplicaLayout.ForPaxos replicaLayout = ReplicaLayout.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
+            ReplicaPlan.ForPaxosWrite replicaPlan = ReplicaPlans.forPaxos(Keyspace.open(metadata.keyspace), key, consistencyLevel);
 
             // does the work of applying in-progress writes; throws UAE or timeout if it can't
             final ConsistencyLevel consistencyForCommitOrFetch = consistencyLevel == ConsistencyLevel.LOCAL_SERIAL
@@ -1601,7 +1598,7 @@ public class StorageProxy implements StorageProxyMBean
 
             try
             {
-                final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaLayout, consistencyLevel, consistencyForCommitOrFetch, false, state);
+                final PaxosBallotAndContention pair = beginAndRepairPaxos(start, key, metadata, replicaPlan, consistencyLevel, consistencyForCommitOrFetch, false, state);
                 if (pair.contentions > 0)
                     casReadMetrics.contention.update(pair.contentions);
             }
@@ -1850,21 +1847,6 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    public static EndpointsForToken getLiveSortedReplicasForToken(Keyspace keyspace, RingPosition pos)
-    {
-        return getLiveSortedReplicas(keyspace, pos).forToken(pos.getToken());
-    }
-
-    public static EndpointsForRange getLiveSortedReplicas(Keyspace keyspace, RingPosition pos)
-    {
-        EndpointsForRange liveReplicas = StorageService.instance.getLiveNaturalReplicas(keyspace, pos);
-        // Replica availability is considered by the query path
-        Preconditions.checkState(liveReplicas.isEmpty() || liveReplicas.stream().anyMatch(Replica::isFull),
-                                 "At least one full replica required for reads: " + liveReplicas);
-
-        return DatabaseDescriptor.getEndpointSnitch().sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), liveReplicas);
-    }
-
     /**
      * Estimate the number of result rows per range in the ring based on our local data.
      * <p>
@@ -1883,7 +1865,7 @@ public class StorageProxy implements StorageProxyMBean
         return (maxExpectedResults / DatabaseDescriptor.getNumTokens()) / keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
     }
 
-    private static class RangeIterator extends AbstractIterator<ReplicaLayout.ForRange>
+    private static class RangeIterator extends AbstractIterator<ReplicaPlan.ForRangeRead>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
@@ -1907,43 +1889,34 @@ public class StorageProxy implements StorageProxyMBean
             return rangeCount;
         }
 
-        protected ReplicaLayout.ForRange computeNext()
+        protected ReplicaPlan.ForRangeRead computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
-            AbstractBounds<PartitionPosition> range = ranges.next();
-            EndpointsForRange liveReplicas = getLiveSortedReplicas(keyspace, range.right);
-
-            int blockFor = consistency.blockFor(keyspace);
-            EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas);
-            int minResponses = Math.min(targetReplicas.size(), blockFor);
-
-            // Endpoints for range here as well
-            return ReplicaLayout.forRangeRead(keyspace, consistency, range,
-                                              liveReplicas, targetReplicas.subList(0, minResponses));
+            return ReplicaPlans.forRangeRead(keyspace, consistency, ranges.next());
         }
     }
 
-    private static class RangeMerger extends AbstractIterator<ReplicaLayout.ForRange>
+    private static class RangeMerger extends AbstractIterator<ReplicaPlan.ForRangeRead>
     {
         private final Keyspace keyspace;
         private final ConsistencyLevel consistency;
-        private final PeekingIterator<ReplicaLayout.ForRange> ranges;
+        private final PeekingIterator<ReplicaPlan.ForRangeRead> ranges;
 
-        private RangeMerger(Iterator<ReplicaLayout.ForRange> iterator, Keyspace keyspace, ConsistencyLevel consistency)
+        private RangeMerger(Iterator<ReplicaPlan.ForRangeRead> iterator, Keyspace keyspace, ConsistencyLevel consistency)
         {
             this.keyspace = keyspace;
             this.consistency = consistency;
             this.ranges = Iterators.peekingIterator(iterator);
         }
 
-        protected ReplicaLayout.ForRange computeNext()
+        protected ReplicaPlan.ForRangeRead computeNext()
         {
             if (!ranges.hasNext())
                 return endOfData();
 
-            ReplicaLayout.ForRange current = ranges.next();
+            ReplicaPlan.ForRangeRead current = ranges.next();
 
             // getRestrictedRange has broken the queried range into per-[vnode] token ranges, but this doesn't take
             // the replication factor into account. If the intersection of live endpoints for 2 consecutive ranges
@@ -1955,25 +1928,15 @@ public class StorageProxy implements StorageProxyMBean
                 // Note: it would be slightly more efficient to have CFS.getRangeSlice on the destination nodes unwraps
                 // the range if necessary and deal with it. However, we can't start sending wrapped range without breaking
                 // wire compatibility, so It's likely easier not to bother;
-                if (current.range.right.isMinimum())
+                if (current.range().right.isMinimum())
                     break;
 
-                ReplicaLayout.ForRange next = ranges.peek();
-
-                EndpointsForRange merged = current.all().keep(next.all().endpoints());
-
-                // Check if there is enough endpoint for the merge to be possible.
-                if (!consistency.isSufficientLiveNodesForRead(keyspace, merged))
-                    break;
-
-                EndpointsForRange filteredMerged = consistency.filterForQuery(keyspace, merged);
-
-                // Estimate whether merging will be a win or not
-                if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(filteredMerged, current.selected(), next.selected()))
+                ReplicaPlan.ForRangeRead next = ranges.peek();
+                ReplicaPlan.ForRangeRead merged = ReplicaPlans.maybeMerge(keyspace, consistency, current, next);
+                if (merged == null)
                     break;
 
-                // If we get there, merge this range and the next one
-                current = ReplicaLayout.forRangeRead(keyspace, consistency, current.range.withNewRight(next.range.right), merged, filteredMerged);
+                current = merged;
                 ranges.next(); // consume the range we just merged since we've only peeked so far
             }
             return current;
@@ -2018,7 +1981,7 @@ public class StorageProxy implements StorageProxyMBean
 
     private static class RangeCommandIterator extends AbstractIterator<RowIterator> implements PartitionIterator
     {
-        private final Iterator<ReplicaLayout.ForRange> ranges;
+        private final Iterator<ReplicaPlan.ForRangeRead> ranges;
         private final int totalRangeCount;
         private final PartitionRangeReadCommand command;
         private final boolean enforceStrictLiveness;
@@ -2108,42 +2071,40 @@ public class StorageProxy implements StorageProxyMBean
         /**
          * Queries the provided sub-range.
          *
-         * @param replicaLayout the subRange to query.
+         * @param replicaPlan the subRange to query.
          * @param isFirst in the case where multiple queries are sent in parallel, whether that's the first query on
          * that batch or not. The reason it matters is that whe paging queries, the command (more specifically the
          * {@code DataLimits}) may have "state" information and that state may only be valid for the first query (in
          * that it's the query that "continues" whatever we're previously queried).
          */
-        private SingleRangeResponse query(ReplicaLayout.ForRange replicaLayout, boolean isFirst)
+        private SingleRangeResponse query(ReplicaPlan.ForRangeRead replicaPlan, boolean isFirst)
         {
-            PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaLayout.range, isFirst);
-            ReadRepair<EndpointsForRange, ReplicaLayout.ForRange> readRepair = ReadRepair.create(command, replicaLayout, queryStartNanoTime);
-            DataResolver<EndpointsForRange, ReplicaLayout.ForRange> resolver = new DataResolver<>(rangeCommand, replicaLayout, readRepair, queryStartNanoTime);
-            Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
-
-            ReadCallback<EndpointsForRange, ReplicaLayout.ForRange> handler = new ReadCallback<>(resolver,
-                                                                                                 replicaLayout.consistencyLevel().blockFor(keyspace),
-                                                                                                 rangeCommand,
-                                                                                                 replicaLayout,
-                                                                                                 queryStartNanoTime);
+            PartitionRangeReadCommand rangeCommand = command.forSubRange(replicaPlan.range(), isFirst);
+            ReplicaPlan.SharedForRangeRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan);
+            ReadRepair<EndpointsForRange, ReplicaPlan.ForRangeRead> readRepair
+                    = ReadRepair.create(command, sharedReplicaPlan, queryStartNanoTime);
+            DataResolver<EndpointsForRange, ReplicaPlan.ForRangeRead> resolver
+                    = new DataResolver<>(rangeCommand, sharedReplicaPlan, readRepair, queryStartNanoTime);
+            ReadCallback<EndpointsForRange, ReplicaPlan.ForRangeRead> handler
+                    = new ReadCallback<>(resolver, rangeCommand, sharedReplicaPlan, queryStartNanoTime);
 
-            handler.assureSufficientLiveNodes();
+            replicaPlan.assureSufficientReplicas();
 
             // If enabled, request repaired data tracking info from full replicas but
             // only if there are multiple full replicas to compare results from
             if (DatabaseDescriptor.getRepairedDataTrackingForPartitionReadsEnabled()
-                && replicaLayout.selected().filter(Replica::isFull).size() > 1)
+                    && replicaPlan.contacts().filter(Replica::isFull).size() > 1)
             {
                 command.trackRepairedStatus();
             }
 
-            if (replicaLayout.selected().size() == 1 && replicaLayout.selected().get(0).isLocal())
+            if (replicaPlan.contacts().size() == 1 && replicaPlan.contacts().get(0).isLocal())
             {
                 StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(rangeCommand, handler));
             }
             else
             {
-                for (Replica replica : replicaLayout.selected())
+                for (Replica replica : replicaPlan.contacts())
                 {
                     Tracing.trace("Enqueuing request to {}", replica);
                     PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery();
@@ -2486,7 +2447,7 @@ public class StorageProxy implements StorageProxyMBean
     public interface WritePerformer
     {
         public void apply(IMutation mutation,
-                          ReplicaLayout.ForToken targets,
+                          ReplicaPlan.ForTokenWrite targets,
                           AbstractWriteResponseHandler<IMutation> responseHandler,
                           String localDataCenter) throws OverloadedException;
     }
@@ -2499,7 +2460,7 @@ public class StorageProxy implements StorageProxyMBean
         public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
         {
             super(writeHandler, i, cleanup, queryStartNanoTime);
-            viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
+            viewWriteMetrics.viewReplicasAttempted.inc(candidateReplicaCount());
         }
 
         public void response(MessageIn<IMutation> msg)
@@ -2705,7 +2666,7 @@ public class StorageProxy implements StorageProxyMBean
                 HintsService.instance.write(hostIds, Hint.create(mutation, System.currentTimeMillis()));
                 validTargets.forEach(HintsService.instance.metrics::incrCreatedHints);
                 // Notify the handler only for CL == ANY
-                if (responseHandler != null && responseHandler.replicaLayout.consistencyLevel() == ConsistencyLevel.ANY)
+                if (responseHandler != null && responseHandler.replicaPlan.consistencyLevel() == ConsistencyLevel.ANY)
                     responseHandler.response(null);
             }
         };

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 7f4ae14..a979f1c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -172,7 +172,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public RangesAtEndpoint getLocalReplicas(String keyspaceName)
     {
-        return getReplicasForEndpoint(keyspaceName, FBUtilities.getBroadcastAddressAndPort());
+        return Keyspace.open(keyspaceName).getReplicationStrategy()
+                .getAddressReplicas(FBUtilities.getBroadcastAddressAndPort());
     }
 
     public List<Range<Token>> getLocalAndPendingRanges(String ks)
@@ -2015,11 +2016,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     */
     private EndpointsByRange constructRangeToEndpointMap(String keyspace, List<Range<Token>> ranges)
     {
+        AbstractReplicationStrategy strategy = Keyspace.open(keyspace).getReplicationStrategy();
         Map<Range<Token>, EndpointsForRange> rangeToEndpointMap = new HashMap<>(ranges.size());
         for (Range<Token> range : ranges)
-        {
-            rangeToEndpointMap.put(range, Keyspace.open(keyspace).getReplicationStrategy().getNaturalReplicas(range.right));
-        }
+            rangeToEndpointMap.put(range, strategy.getNaturalReplicas(range.right));
         return new EndpointsByRange(rangeToEndpointMap);
     }
 
@@ -3878,16 +3878,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
     }
 
     /**
-     * Get all ranges an endpoint is responsible for (by keyspace)
-     * @param ep endpoint we are interested in.
-     * @return ranges for the specified endpoint.
-     */
-    RangesAtEndpoint getReplicasForEndpoint(String keyspaceName, InetAddressAndPort ep)
-    {
-        return Keyspace.open(keyspaceName).getReplicationStrategy().getAddressReplicas(ep);
-    }
-
-    /**
      * Get all ranges that span the ring given a set
      * of tokens. All ranges are in sorted order of
      * ranges.
@@ -3936,11 +3926,10 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, cf, key), true);
     }
 
-
     @Deprecated
     public List<InetAddress> getNaturalEndpoints(String keyspaceName, ByteBuffer key)
     {
-        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key));
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
         List<InetAddress> inetList = new ArrayList<>(replicas.size());
         replicas.forEach(r -> inetList.add(r.endpoint().address));
         return inetList;
@@ -3948,7 +3937,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
 
     public List<String> getNaturalEndpointsWithPort(String keyspaceName, ByteBuffer key)
     {
-        return Replicas.stringify(getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(key)), true);
+        EndpointsForToken replicas = getNaturalReplicasForToken(keyspaceName, key);
+        return Replicas.stringify(replicas, true);
     }
 
     public List<String> getReplicas(String keyspaceName, String cf, String key)
@@ -3971,61 +3961,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
         if (metadata == null)
             throw new IllegalArgumentException("Unknown table '" + cf + "' in keyspace '" + keyspaceName + "'");
 
-        return getNaturalReplicasForToken(keyspaceName, tokenMetadata.partitioner.getToken(metadata.partitionKeyType.fromString(key)));
-    }
-
-    /**
-     * This method returns the N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspaceName keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     * @return the endpoint responsible for this token
-     */
-    public static EndpointsForToken getNaturalReplicasForToken(String keyspaceName, RingPosition pos)
-    {
-        return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(pos);
+        return getNaturalReplicasForToken(keyspaceName, metadata.partitionKeyType.fromString(key));
     }
 
-    /**
-     * Returns the endpoints currently responsible for storing the token plus pending ones
-     */
-    public EndpointsForToken getNaturalAndPendingReplicasForToken(String keyspaceName, Token token)
-    {
-        // TODO: race condition to fetch these. impliciations??
-        EndpointsForToken natural = getNaturalReplicasForToken(keyspaceName, token);
-        EndpointsForToken pending = tokenMetadata.pendingEndpointsForToken(token, keyspaceName);
-        if (Endpoints.haveConflicts(natural, pending))
-        {
-            natural = Endpoints.resolveConflictsInNatural(natural, pending);
-            pending = Endpoints.resolveConflictsInPending(natural, pending);
-        }
-        return Endpoints.concat(natural, pending);
-    }
-
-    /**
-     * This method attempts to return N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspace keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     */
-    public EndpointsForToken getLiveNaturalReplicasForToken(Keyspace keyspace, RingPosition pos)
-    {
-        return getLiveNaturalReplicas(keyspace, pos).forToken(pos.getToken());
-    }
-
-    /**
-     * This method attempts to return N endpoints that are responsible for storing the
-     * specified key i.e for replication.
-     *
-     * @param keyspace keyspace name also known as keyspace
-     * @param pos position for which we need to find the endpoint
-     */
-    public EndpointsForRange getLiveNaturalReplicas(Keyspace keyspace, RingPosition pos)
+    public EndpointsForToken getNaturalReplicasForToken(String keyspaceName, ByteBuffer key)
     {
-        EndpointsForRange replicas = keyspace.getReplicationStrategy().getNaturalReplicas(pos);
-        return replicas.filter(r -> FailureDetector.instance.isAlive(r.endpoint()));
+        Token token = tokenMetadata.partitioner.getToken(key);
+        return Keyspace.open(keyspaceName).getReplicationStrategy().getNaturalReplicasForToken(token);
     }
 
     public void setLoggingLevel(String classQualifier, String rawLevel) throws Exception
@@ -4268,7 +4210,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                              .filter(endpoint -> FailureDetector.instance.isAlive(endpoint) && !FBUtilities.getBroadcastAddressAndPort().equals(endpoint))
                              .collect(Collectors.toList());
 
-        return EndpointsForRange.copyOf(SystemReplicas.getSystemReplicas(endpoints));
+        return SystemReplicas.getSystemReplicas(endpoints);
     }
     /**
      * Find the best target to stream hints to. Currently the closest peer according to the snitch

http://git-wip-us.apache.org/repos/asf/cassandra/blob/047bcd7a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index a07aae6..f9bfedf 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -19,9 +19,7 @@ package org.apache.cassandra.service;
 
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.ReplicaLayout;
-
+import org.apache.cassandra.locator.ReplicaPlan;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,18 +37,18 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
     private static final AtomicIntegerFieldUpdater<WriteResponseHandler> responsesUpdater
             = AtomicIntegerFieldUpdater.newUpdater(WriteResponseHandler.class, "responses");
 
-    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout,
+    public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan,
                                 Runnable callback,
                                 WriteType writeType,
                                 long queryStartNanoTime)
     {
-        super(replicaLayout, callback, writeType, queryStartNanoTime);
-        responses = totalBlockFor();
+        super(replicaPlan, callback, writeType, queryStartNanoTime);
+        responses = blockFor();
     }
 
-    public WriteResponseHandler(ReplicaLayout.ForToken replicaLayout, WriteType writeType, long queryStartNanoTime)
+    public WriteResponseHandler(ReplicaPlan.ForTokenWrite replicaPlan, WriteType writeType, long queryStartNanoTime)
     {
-        this(replicaLayout, null, writeType, queryStartNanoTime);
+        this(replicaPlan, null, writeType, queryStartNanoTime);
     }
 
     public void response(MessageIn<T> m)
@@ -65,7 +63,7 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
 
     protected int ackCount()
     {
-        return totalBlockFor() - responses;
+        return blockFor() - responses;
     }
 
     public boolean isLatencyForSnitch()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org