You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by aw...@apache.org on 2018/09/01 01:36:05 UTC

[13/18] cassandra git commit: Transient Replication and Cheap Quorums

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index cb2ea46..c63f4f3 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -20,10 +20,12 @@ package org.apache.cassandra.locator;
 import java.util.*;
 import java.util.Map.Entry;
 
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.dht.Datacenters;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
@@ -49,14 +51,17 @@ import com.google.common.collect.Multimap;
  */
 public class NetworkTopologyStrategy extends AbstractReplicationStrategy
 {
-    private final Map<String, Integer> datacenters;
+    private final Map<String, ReplicationFactor> datacenters;
+    private final ReplicationFactor aggregateRf;
     private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class);
 
     public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
     {
         super(keyspaceName, tokenMetadata, snitch, configOptions);
 
-        Map<String, Integer> newDatacenters = new HashMap<String, Integer>();
+        int replicas = 0;
+        int trans = 0;
+        Map<String, ReplicationFactor> newDatacenters = new HashMap<>();
         if (configOptions != null)
         {
             for (Entry<String, String> entry : configOptions.entrySet())
@@ -64,12 +69,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
                 String dc = entry.getKey();
                 if (dc.equalsIgnoreCase("replication_factor"))
                     throw new ConfigurationException("replication_factor is an option for SimpleStrategy, not NetworkTopologyStrategy");
-                Integer replicas = Integer.valueOf(entry.getValue());
-                newDatacenters.put(dc, replicas);
+                ReplicationFactor rf = ReplicationFactor.fromString(entry.getValue());
+                replicas += rf.allReplicas;
+                trans += rf.transientReplicas();
+                newDatacenters.put(dc, rf);
             }
         }
 
         datacenters = Collections.unmodifiableMap(newDatacenters);
+        aggregateRf = ReplicationFactor.withTransient(replicas, trans);
         logger.info("Configured datacenter replicas are {}", FBUtilities.toString(datacenters));
     }
 
@@ -79,7 +87,8 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
     private static final class DatacenterEndpoints
     {
         /** List accepted endpoints get pushed into. */
-        Set<InetAddressAndPort> endpoints;
+        EndpointsForRange.Mutable replicas;
+
         /**
          * Racks encountered so far. Replicas are put into separate racks while possible.
          * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure
@@ -90,41 +99,51 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         /** Number of replicas left to fill from this DC. */
         int rfLeft;
         int acceptableRackRepeats;
+        int transients;
 
-        DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddressAndPort> endpoints, Set<Pair<String, String>> racks)
+        DatacenterEndpoints(ReplicationFactor rf, int rackCount, int nodeCount, EndpointsForRange.Mutable replicas, Set<Pair<String, String>> racks)
         {
-            this.endpoints = endpoints;
+            this.replicas = replicas;
             this.racks = racks;
             // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF.
-            this.rfLeft = Math.min(rf, nodeCount);
+            this.rfLeft = Math.min(rf.allReplicas, nodeCount);
             // If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack,
             // and the difference is to be filled by the first encountered nodes.
-            acceptableRackRepeats = rf - rackCount;
+            acceptableRackRepeats = rf.allReplicas - rackCount;
+
+            // if we have fewer replicas than rf calls for, reduce transients accordingly
+            int reduceTransients = rf.allReplicas - this.rfLeft;
+            transients = Math.max(rf.transientReplicas() - reduceTransients, 0);
+            ReplicationFactor.validate(rfLeft, transients);
         }
 
         /**
-         * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful.
+         * Attempts to add an endpoint to the replicas for this datacenter, adding to the replicas set if successful.
          * Returns true if the endpoint was added, and this datacenter does not require further replicas.
          */
-        boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location)
+        boolean addEndpointAndCheckIfDone(InetAddressAndPort ep, Pair<String,String> location, Range<Token> replicatedRange)
         {
             if (done())
                 return false;
 
+            if (replicas.endpoints().contains(ep))
+                // Cannot repeat a node.
+                return false;
+
+            Replica replica = new Replica(ep, replicatedRange, rfLeft > transients);
+
             if (racks.add(location))
             {
                 // New rack.
                 --rfLeft;
-                boolean added = endpoints.add(ep);
-                assert added;
+                replicas.add(replica, Conflict.NONE);
                 return done();
             }
             if (acceptableRackRepeats <= 0)
                 // There must be rfLeft distinct racks left, do not add any more rack repeats.
                 return false;
-            if (!endpoints.add(ep))
-                // Cannot repeat a node.
-                return false;
+
+            replicas.add(replica, Conflict.NONE);
             // Added a node that is from an already met rack to match RF when there aren't enough racks.
             --acceptableRackRepeats;
             --rfLeft;
@@ -141,10 +160,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
     /**
      * calculate endpoints in one pass through the tokens by tracking our progress in each DC.
      */
-    public List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
+    public EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata)
     {
         // we want to preserve insertion order so that the first added endpoint becomes primary
-        Set<InetAddressAndPort> replicas = new LinkedHashSet<>();
+        ArrayList<Token> sortedTokens = tokenMetadata.sortedTokens();
+        Token replicaEnd = TokenMetadata.firstToken(sortedTokens, searchToken);
+        Token replicaStart = tokenMetadata.getPredecessor(replicaEnd);
+        Range<Token> replicatedRange = new Range<>(replicaStart, replicaEnd);
+
+        EndpointsForRange.Mutable builder = new EndpointsForRange.Mutable(replicatedRange);
         Set<Pair<String, String>> seenRacks = new HashSet<>();
 
         Topology topology = tokenMetadata.getTopology();
@@ -158,31 +182,31 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2);
 
         // Create a DatacenterEndpoints object for each non-empty DC.
-        for (Map.Entry<String, Integer> en : datacenters.entrySet())
+        for (Map.Entry<String, ReplicationFactor> en : datacenters.entrySet())
         {
             String dc = en.getKey();
-            int rf = en.getValue();
+            ReplicationFactor rf = en.getValue();
             int nodeCount = sizeOrZero(allEndpoints.get(dc));
 
-            if (rf <= 0 || nodeCount <= 0)
+            if (rf.allReplicas <= 0 || nodeCount <= 0)
                 continue;
 
-            DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks);
+            DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, builder, seenRacks);
             dcs.put(dc, dcEndpoints);
             ++dcsToFill;
         }
 
-        Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+        Iterator<Token> tokenIter = TokenMetadata.ringIterator(sortedTokens, searchToken, false);
         while (dcsToFill > 0 && tokenIter.hasNext())
         {
             Token next = tokenIter.next();
             InetAddressAndPort ep = tokenMetadata.getEndpoint(next);
             Pair<String, String> location = topology.getLocation(ep);
             DatacenterEndpoints dcEndpoints = dcs.get(location.left);
-            if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location))
+            if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location, replicatedRange))
                 --dcsToFill;
         }
-        return new ArrayList<>(replicas);
+        return builder.asImmutableView();
     }
 
     private int sizeOrZero(Multimap<?, ?> collection)
@@ -195,18 +219,15 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         return collection != null ? collection.size() : 0;
     }
 
-    public int getReplicationFactor()
+    public ReplicationFactor getReplicationFactor()
     {
-        int total = 0;
-        for (int repFactor : datacenters.values())
-            total += repFactor;
-        return total;
+        return aggregateRf;
     }
 
-    public int getReplicationFactor(String dc)
+    public ReplicationFactor getReplicationFactor(String dc)
     {
-        Integer replicas = datacenters.get(dc);
-        return replicas == null ? 0 : replicas;
+        ReplicationFactor replicas = datacenters.get(dc);
+        return replicas == null ? ReplicationFactor.ZERO : replicas;
     }
 
     public Set<String> getDatacenters()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
index 93e629e..449c51e 100644
--- a/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/OldNetworkTopologyStrategy.java
@@ -21,9 +21,9 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 
@@ -36,27 +36,32 @@ import org.apache.cassandra.dht.Token;
  */
 public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
 {
+    private final ReplicationFactor rf;
     public OldNetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions)
     {
         super(keyspaceName, tokenMetadata, snitch, configOptions);
+        this.rf = ReplicationFactor.fromString(this.configOptions.get("replication_factor"));
     }
 
-    public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata)
+    public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata)
     {
-        int replicas = getReplicationFactor();
-        List<InetAddressAndPort> endpoints = new ArrayList<>(replicas);
         ArrayList<Token> tokens = metadata.sortedTokens();
-
         if (tokens.isEmpty())
-            return endpoints;
+            return EndpointsForRange.empty(new Range<>(metadata.partitioner.getMinimumToken(), metadata.partitioner.getMinimumToken()));
 
         Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
         Token primaryToken = iter.next();
-        endpoints.add(metadata.getEndpoint(primaryToken));
+        Token previousToken = metadata.getPredecessor(primaryToken);
+        Range<Token> tokenRange = new Range<>(previousToken, primaryToken);
+
+        EndpointsForRange.Builder replicas = EndpointsForRange.builder(tokenRange, rf.allReplicas);
+
+        assert !rf.hasTransientReplicas() : "support transient replicas";
+        replicas.add(new Replica(metadata.getEndpoint(primaryToken), previousToken, primaryToken, true));
 
         boolean bDataCenter = false;
         boolean bOtherRack = false;
-        while (endpoints.size() < replicas && iter.hasNext())
+        while (replicas.size() < rf.allReplicas && iter.hasNext())
         {
             // First try to find one in a different data center
             Token t = iter.next();
@@ -65,7 +70,7 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
                 // If we have already found something in a diff datacenter no need to find another
                 if (!bDataCenter)
                 {
-                    endpoints.add(metadata.getEndpoint(t));
+                    replicas.add(new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true));
                     bDataCenter = true;
                 }
                 continue;
@@ -77,7 +82,7 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
                 // If we have already found something in a diff rack no need to find another
                 if (!bOtherRack)
                 {
-                    endpoints.add(metadata.getEndpoint(t));
+                    replicas.add(new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true));
                     bOtherRack = true;
                 }
             }
@@ -86,23 +91,24 @@ public class OldNetworkTopologyStrategy extends AbstractReplicationStrategy
 
         // If we found N number of nodes we are good. This loop wil just exit. Otherwise just
         // loop through the list and add until we have N nodes.
-        if (endpoints.size() < replicas)
+        if (replicas.size() < rf.allReplicas)
         {
             iter = TokenMetadata.ringIterator(tokens, token, false);
-            while (endpoints.size() < replicas && iter.hasNext())
+            while (replicas.size() < rf.allReplicas && iter.hasNext())
             {
                 Token t = iter.next();
-                if (!endpoints.contains(metadata.getEndpoint(t)))
-                    endpoints.add(metadata.getEndpoint(t));
+                Replica replica = new Replica(metadata.getEndpoint(t), previousToken, primaryToken, true);
+                if (!replicas.containsEndpoint(replica.endpoint()))
+                    replicas.add(replica);
             }
         }
 
-        return endpoints;
+        return replicas.build();
     }
 
-    public int getReplicationFactor()
+    public ReplicationFactor getReplicationFactor()
     {
-        return Integer.parseInt(this.configOptions.get("replication_factor"));
+        return rf;
     }
 
     public void validateOptions() throws ConfigurationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
index 92307a3..b8b7bc6 100644
--- a/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
+++ b/src/java/org/apache/cassandra/locator/PendingRangeMaps.java
@@ -23,166 +23,147 @@ package org.apache.cassandra.locator;
 import com.google.common.collect.Iterators;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
 
 import java.util.*;
 
-public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<InetAddressAndPort>>>
+public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, EndpointsForRange.Mutable>>
 {
-    private static final Logger logger = LoggerFactory.getLogger(PendingRangeMaps.class);
-
     /**
      * We have for NavigableMap to be able to search for ranges containing a token efficiently.
      *
      * First two are for non-wrap-around ranges, and the last two are for wrap-around ranges.
      */
     // ascendingMap will sort the ranges by the ascending order of right token
-    final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap;
+    private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMap;
+
     /**
      * sorting end ascending, if ends are same, sorting begin descending, so that token (end, end) will
      * come before (begin, end] with the same end, and (begin, end) will be selected in the tailMap.
      */
-    static final Comparator<Range<Token>> ascendingComparator = new Comparator<Range<Token>>()
-        {
-            @Override
-            public int compare(Range<Token> o1, Range<Token> o2)
-            {
-                int res = o1.right.compareTo(o2.right);
-                if (res != 0)
-                    return res;
+    private static final Comparator<Range<Token>> ascendingComparator = (o1, o2) -> {
+        int res = o1.right.compareTo(o2.right);
+        if (res != 0)
+            return res;
 
-                return o2.left.compareTo(o1.left);
-            }
-        };
+        return o2.left.compareTo(o1.left);
+    };
 
     // ascendingMap will sort the ranges by the descending order of left token
-    final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap;
+    private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMap;
+
     /**
      * sorting begin descending, if begins are same, sorting end descending, so that token (begin, begin) will
      * come after (begin, end] with the same begin, and (begin, end) won't be selected in the tailMap.
      */
-    static final Comparator<Range<Token>> descendingComparator = new Comparator<Range<Token>>()
-        {
-            @Override
-            public int compare(Range<Token> o1, Range<Token> o2)
-            {
-                int res = o2.left.compareTo(o1.left);
-                if (res != 0)
-                    return res;
+    private static final Comparator<Range<Token>> descendingComparator = (o1, o2) -> {
+        int res = o2.left.compareTo(o1.left);
+        if (res != 0)
+            return res;
 
-                // if left tokens are same, sort by the descending of the right tokens.
-                return o2.right.compareTo(o1.right);
-            }
-        };
+        // if left tokens are same, sort by the descending of the right tokens.
+        return o2.right.compareTo(o1.right);
+    };
 
     // these two maps are for warp around ranges.
-    final NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMapForWrapAround;
+    private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMapForWrapAround;
+
     /**
      * for wrap around range (begin, end], which begin > end.
      * Sorting end ascending, if ends are same, sorting begin ascending,
      * so that token (end, end) will come before (begin, end] with the same end, and (begin, end] will be selected in
      * the tailMap.
      */
-    static final Comparator<Range<Token>> ascendingComparatorForWrapAround = new Comparator<Range<Token>>()
-    {
-        @Override
-        public int compare(Range<Token> o1, Range<Token> o2)
-        {
-            int res = o1.right.compareTo(o2.right);
-            if (res != 0)
-                return res;
+    private static final Comparator<Range<Token>> ascendingComparatorForWrapAround = (o1, o2) -> {
+        int res = o1.right.compareTo(o2.right);
+        if (res != 0)
+            return res;
 
-            return o1.left.compareTo(o2.left);
-        }
+        return o1.left.compareTo(o2.left);
     };
 
-    final NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMapForWrapAround;
+    private final NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMapForWrapAround;
+
     /**
      * for wrap around ranges, which begin > end.
      * Sorting end ascending, so that token (begin, begin) will come after (begin, end] with the same begin,
      * and (begin, end) won't be selected in the tailMap.
      */
-    static final Comparator<Range<Token>> descendingComparatorForWrapAround = new Comparator<Range<Token>>()
-    {
-        @Override
-        public int compare(Range<Token> o1, Range<Token> o2)
-        {
-            int res = o2.left.compareTo(o1.left);
-            if (res != 0)
-                return res;
-            return o1.right.compareTo(o2.right);
-        }
+    private static final Comparator<Range<Token>> descendingComparatorForWrapAround = (o1, o2) -> {
+        int res = o2.left.compareTo(o1.left);
+        if (res != 0)
+            return res;
+        return o1.right.compareTo(o2.right);
     };
 
     public PendingRangeMaps()
     {
-        this.ascendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparator);
-        this.descendingMap = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparator);
-        this.ascendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(ascendingComparatorForWrapAround);
-        this.descendingMapForWrapAround = new TreeMap<Range<Token>, List<InetAddressAndPort>>(descendingComparatorForWrapAround);
+        this.ascendingMap = new TreeMap<>(ascendingComparator);
+        this.descendingMap = new TreeMap<>(descendingComparator);
+        this.ascendingMapForWrapAround = new TreeMap<>(ascendingComparatorForWrapAround);
+        this.descendingMapForWrapAround = new TreeMap<>(descendingComparatorForWrapAround);
     }
 
     static final void addToMap(Range<Token> range,
-                               InetAddressAndPort address,
-                               NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingMap,
-                               NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingMap)
+                               Replica replica,
+                               NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingMap,
+                               NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingMap)
     {
-        List<InetAddressAndPort> addresses = ascendingMap.get(range);
-        if (addresses == null)
+        EndpointsForRange.Mutable replicas = ascendingMap.get(range);
+        if (replicas == null)
         {
-            addresses = new ArrayList<>(1);
-            ascendingMap.put(range, addresses);
-            descendingMap.put(range, addresses);
+            replicas = new EndpointsForRange.Mutable(range,1);
+            ascendingMap.put(range, replicas);
+            descendingMap.put(range, replicas);
         }
-        addresses.add(address);
+        replicas.add(replica, Conflict.DUPLICATE);
     }
 
-    public void addPendingRange(Range<Token> range, InetAddressAndPort address)
+    public void addPendingRange(Range<Token> range, Replica replica)
     {
         if (Range.isWrapAround(range.left, range.right))
         {
-            addToMap(range, address, ascendingMapForWrapAround, descendingMapForWrapAround);
+            addToMap(range, replica, ascendingMapForWrapAround, descendingMapForWrapAround);
         }
         else
         {
-            addToMap(range, address, ascendingMap, descendingMap);
+            addToMap(range, replica, ascendingMap, descendingMap);
         }
     }
 
-    static final void addIntersections(Set<InetAddressAndPort> endpointsToAdd,
-                                       NavigableMap<Range<Token>, List<InetAddressAndPort>> smallerMap,
-                                       NavigableMap<Range<Token>, List<InetAddressAndPort>> biggerMap)
+    static final void addIntersections(EndpointsForToken.Builder replicasToAdd,
+                                       NavigableMap<Range<Token>, EndpointsForRange.Mutable> smallerMap,
+                                       NavigableMap<Range<Token>, EndpointsForRange.Mutable> biggerMap)
     {
         // find the intersection of two sets
         for (Range<Token> range : smallerMap.keySet())
         {
-            List<InetAddressAndPort> addresses = biggerMap.get(range);
-            if (addresses != null)
+            EndpointsForRange.Mutable replicas = biggerMap.get(range);
+            if (replicas != null)
             {
-                endpointsToAdd.addAll(addresses);
+                replicasToAdd.addAll(replicas);
             }
         }
     }
 
-    public Collection<InetAddressAndPort> pendingEndpointsFor(Token token)
+    public EndpointsForToken pendingEndpointsFor(Token token)
     {
-        Set<InetAddressAndPort> endpoints = new HashSet<>();
+        EndpointsForToken.Builder replicas = EndpointsForToken.builder(token);
 
-        Range searchRange = new Range(token, token);
+        Range<Token> searchRange = new Range<>(token, token);
 
         // search for non-wrap-around maps
-        NavigableMap<Range<Token>, List<InetAddressAndPort>> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
-        NavigableMap<Range<Token>, List<InetAddressAndPort>> descendingTailMap = descendingMap.tailMap(searchRange, false);
+        NavigableMap<Range<Token>, EndpointsForRange.Mutable> ascendingTailMap = ascendingMap.tailMap(searchRange, true);
+        NavigableMap<Range<Token>, EndpointsForRange.Mutable> descendingTailMap = descendingMap.tailMap(searchRange, false);
 
         // add intersections of two maps
         if (ascendingTailMap.size() < descendingTailMap.size())
         {
-            addIntersections(endpoints, ascendingTailMap, descendingTailMap);
+            addIntersections(replicas, ascendingTailMap, descendingTailMap);
         }
         else
         {
-            addIntersections(endpoints, descendingTailMap, ascendingTailMap);
+            addIntersections(replicas, descendingTailMap, ascendingTailMap);
         }
 
         // search for wrap-around sets
@@ -190,29 +171,29 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
         descendingTailMap = descendingMapForWrapAround.tailMap(searchRange, false);
 
         // add them since they are all necessary.
-        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : ascendingTailMap.entrySet())
+        for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : ascendingTailMap.entrySet())
         {
-            endpoints.addAll(entry.getValue());
+            replicas.addAll(entry.getValue());
         }
-        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : descendingTailMap.entrySet())
+        for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : descendingTailMap.entrySet())
         {
-            endpoints.addAll(entry.getValue());
+            replicas.addAll(entry.getValue());
         }
 
-        return endpoints;
+        return replicas.build();
     }
 
     public String printPendingRanges()
     {
         StringBuilder sb = new StringBuilder();
 
-        for (Map.Entry<Range<Token>, List<InetAddressAndPort>> entry : this)
+        for (Map.Entry<Range<Token>, EndpointsForRange.Mutable> entry : this)
         {
             Range<Token> range = entry.getKey();
 
-            for (InetAddressAndPort address : entry.getValue())
+            for (Replica replica : entry.getValue())
             {
-                sb.append(address).append(':').append(range);
+                sb.append(replica).append(':').append(range);
                 sb.append(System.getProperty("line.separator"));
             }
         }
@@ -221,7 +202,7 @@ public class PendingRangeMaps implements Iterable<Map.Entry<Range<Token>, List<I
     }
 
     @Override
-    public Iterator<Map.Entry<Range<Token>, List<InetAddressAndPort>>> iterator()
+    public Iterator<Map.Entry<Range<Token>, EndpointsForRange.Mutable>> iterator()
     {
         return Iterators.concat(ascendingMap.entrySet().iterator(), ascendingMapForWrapAround.entrySet().iterator());
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
new file mode 100644
index 0000000..74828ad
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/RangesAtEndpoint.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
+
+import static org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict.*;
+
+/**
+ * A ReplicaCollection for Ranges occurring at an endpoint. All Replica will be for the same endpoint,
+ * and must be unique Ranges (though overlapping ranges are presently permitted, these should probably not be permitted to occur)
+ */
+public class RangesAtEndpoint extends AbstractReplicaCollection<RangesAtEndpoint>
+{
+    private static final Map<Range<Token>, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>());
+
+    private final InetAddressAndPort endpoint;
+    private volatile Map<Range<Token>, Replica> byRange;
+    private volatile RangesAtEndpoint fullRanges;
+    private volatile RangesAtEndpoint transRanges;
+
+    private RangesAtEndpoint(InetAddressAndPort endpoint, List<Replica> list, boolean isSnapshot)
+    {
+        this(endpoint, list, isSnapshot, null);
+    }
+    private RangesAtEndpoint(InetAddressAndPort endpoint, List<Replica> list, boolean isSnapshot, Map<Range<Token>, Replica> byRange)
+    {
+        super(list, isSnapshot);
+        this.endpoint = endpoint;
+        this.byRange = byRange;
+        assert endpoint != null;
+    }
+
+    public InetAddressAndPort endpoint()
+    {
+        return endpoint;
+    }
+
+    @Override
+    public Set<InetAddressAndPort> endpoints()
+    {
+        return Collections.unmodifiableSet(list.isEmpty()
+                ? Collections.emptySet()
+                : Collections.singleton(endpoint)
+        );
+    }
+
+    public Set<Range<Token>> ranges()
+    {
+        return byRange().keySet();
+    }
+
+    public Map<Range<Token>, Replica> byRange()
+    {
+        Map<Range<Token>, Replica> map = byRange;
+        if (map == null)
+            byRange = map = buildByRange(list);
+        return map;
+    }
+
+    @Override
+    protected RangesAtEndpoint snapshot(List<Replica> subList)
+    {
+        if (subList.isEmpty()) return empty(endpoint);
+        return new RangesAtEndpoint(endpoint, subList, true);
+    }
+
+    @Override
+    public RangesAtEndpoint self()
+    {
+        return this;
+    }
+
+    @Override
+    public ReplicaCollection.Mutable<RangesAtEndpoint> newMutable(int initialCapacity)
+    {
+        return new Mutable(endpoint, initialCapacity);
+    }
+
+    @Override
+    public boolean contains(Replica replica)
+    {
+        return replica != null
+                && Objects.equals(
+                        byRange().get(replica.range()),
+                        replica);
+    }
+
+    public RangesAtEndpoint full()
+    {
+        RangesAtEndpoint coll = fullRanges;
+        if (fullRanges == null)
+            fullRanges = coll = filter(Replica::isFull);
+        return coll;
+    }
+
+    public RangesAtEndpoint trans()
+    {
+        RangesAtEndpoint coll = transRanges;
+        if (transRanges == null)
+            transRanges = coll = filter(Replica::isTransient);
+        return coll;
+    }
+
+    public Collection<Range<Token>> fullRanges()
+    {
+        return full().ranges();
+    }
+
+    public Collection<Range<Token>> transientRanges()
+    {
+        return trans().ranges();
+    }
+
+    public boolean contains(Range<Token> range, boolean isFull)
+    {
+        Replica replica = byRange().get(range);
+        return replica != null && replica.isFull() == isFull;
+    }
+
+    private static Map<Range<Token>, Replica> buildByRange(List<Replica> list)
+    {
+        // TODO: implement a delegating map that uses our superclass' list, and is immutable
+        Map<Range<Token>, Replica> byRange = new LinkedHashMap<>(list.size());
+        for (Replica replica : list)
+        {
+            Replica prev = byRange.put(replica.range(), replica);
+            assert prev == null : "duplicate range in RangesAtEndpoint: " + prev + " and " + replica;
+        }
+
+        return Collections.unmodifiableMap(byRange);
+    }
+
+    public static Collector<Replica, Builder, RangesAtEndpoint> collector(InetAddressAndPort endpoint)
+    {
+        return collector(ImmutableSet.of(), () -> new Builder(endpoint));
+    }
+
+    public static class Mutable extends RangesAtEndpoint implements ReplicaCollection.Mutable<RangesAtEndpoint>
+    {
+        boolean hasSnapshot;
+        public Mutable(InetAddressAndPort endpoint) { this(endpoint, 0); }
+        public Mutable(InetAddressAndPort endpoint, int capacity) { super(endpoint, new ArrayList<>(capacity), false, new LinkedHashMap<>()); }
+
+        public void add(Replica replica, Conflict ignoreConflict)
+        {
+            if (hasSnapshot) throw new IllegalStateException();
+            Preconditions.checkNotNull(replica);
+            if (!Objects.equals(super.endpoint, replica.endpoint()))
+                throw new IllegalArgumentException("Replica " + replica + " has incorrect endpoint (expected " + super.endpoint + ")");
+
+            Replica prev = super.byRange.put(replica.range(), replica);
+            if (prev != null)
+            {
+                super.byRange.put(replica.range(), prev); // restore prev
+                switch (ignoreConflict)
+                {
+                    case DUPLICATE:
+                        if (prev.equals(replica))
+                            break;
+                    case NONE:
+                        throw new IllegalArgumentException("Conflicting replica added (expected unique ranges): " + replica + "; existing: " + prev);
+                    case ALL:
+                }
+                return;
+            }
+
+            list.add(replica);
+        }
+
+        @Override
+        public Map<Range<Token>, Replica> byRange()
+        {
+            // our internal map is modifiable, but it is unsafe to modify the map externally
+            // it would be possible to implement a safe modifiable map, but it is probably not valuable
+            return Collections.unmodifiableMap(super.byRange());
+        }
+
+        public RangesAtEndpoint get(boolean isSnapshot)
+        {
+            return new RangesAtEndpoint(super.endpoint, super.list, isSnapshot, Collections.unmodifiableMap(super.byRange));
+        }
+
+        public RangesAtEndpoint asImmutableView()
+        {
+            return get(false);
+        }
+
+        public RangesAtEndpoint asSnapshot()
+        {
+            hasSnapshot = true;
+            return get(true);
+        }
+    }
+
+    public static class Builder extends ReplicaCollection.Builder<RangesAtEndpoint, Mutable, RangesAtEndpoint.Builder>
+    {
+        public Builder(InetAddressAndPort endpoint) { this(endpoint, 0); }
+        public Builder(InetAddressAndPort endpoint, int capacity) { super(new Mutable(endpoint, capacity)); }
+    }
+
+    public static RangesAtEndpoint.Builder builder(InetAddressAndPort endpoint)
+    {
+        return new RangesAtEndpoint.Builder(endpoint);
+    }
+
+    public static RangesAtEndpoint.Builder builder(InetAddressAndPort endpoint, int capacity)
+    {
+        return new RangesAtEndpoint.Builder(endpoint, capacity);
+    }
+
+    public static RangesAtEndpoint empty(InetAddressAndPort endpoint)
+    {
+        return new RangesAtEndpoint(endpoint, EMPTY_LIST, true, EMPTY_MAP);
+    }
+
+    public static RangesAtEndpoint of(Replica replica)
+    {
+        ArrayList<Replica> one = new ArrayList<>(1);
+        one.add(replica);
+        return new RangesAtEndpoint(replica.endpoint(), one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.range(), replica)));
+    }
+
+    public static RangesAtEndpoint of(Replica ... replicas)
+    {
+        return copyOf(Arrays.asList(replicas));
+    }
+
+    public static RangesAtEndpoint copyOf(List<Replica> replicas)
+    {
+        if (replicas.isEmpty())
+            throw new IllegalArgumentException("Must specify a non-empty collection of replicas");
+        return builder(replicas.get(0).endpoint(), replicas.size()).addAll(replicas).build();
+    }
+
+
+    /**
+     * Use of this method to synthesize Replicas is almost always wrong. In repair it turns out the concerns of transient
+     * vs non-transient are handled at a higher level, but eventually repair needs to ask streaming to actually move
+     * the data and at that point it doesn't have a great handle on what the replicas are and it doesn't really matter.
+     *
+     * Streaming expects to be given Replicas with each replica indicating what type of data (transient or not transient)
+     * should be sent.
+     *
+     * So in this one instance we can lie to streaming and pretend all the replicas are full and use a dummy address
+     * and it doesn't matter because streaming doesn't rely on the address for anything other than debugging and full
+     * is a valid value for transientness because streaming is selecting candidate tables from the repair/unrepaired
+     * set already.
+     * @param ranges
+     * @return
+     */
+    @VisibleForTesting
+    public static RangesAtEndpoint toDummyList(Collection<Range<Token>> ranges)
+    {
+        InetAddressAndPort dummy;
+        try
+        {
+            dummy = InetAddressAndPort.getByNameOverrideDefaults("0.0.0.0", 0);
+        }
+        catch (UnknownHostException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        //For repair we are less concerned with full vs transient since repair is already dealing with those concerns.
+        //Always say full and then if the repair is incremental or not will determine what is streamed.
+        return ranges.stream()
+                .map(range -> new Replica(dummy, range, true))
+                .collect(collector(dummy));
+    }
+
+    /**
+     * @return concatenate two DISJOINT collections together
+     */
+    public static RangesAtEndpoint concat(RangesAtEndpoint replicas, RangesAtEndpoint extraReplicas)
+    {
+        return AbstractReplicaCollection.concat(replicas, extraReplicas, NONE);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
new file mode 100644
index 0000000..698b133
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, RangesAtEndpoint>
+{
+    public RangesByEndpoint(Map<InetAddressAndPort, RangesAtEndpoint> map)
+    {
+        super(map);
+    }
+
+    public RangesAtEndpoint get(InetAddressAndPort endpoint)
+    {
+        Preconditions.checkNotNull(endpoint);
+        return map.getOrDefault(endpoint, RangesAtEndpoint.empty(endpoint));
+    }
+
+    public static class Mutable extends ReplicaMultimap.Mutable<InetAddressAndPort, RangesAtEndpoint.Mutable>
+    {
+        @Override
+        protected RangesAtEndpoint.Mutable newMutable(InetAddressAndPort endpoint)
+        {
+            return new RangesAtEndpoint.Mutable(endpoint);
+        }
+
+        public RangesByEndpoint asImmutableView()
+        {
+            return new RangesByEndpoint(Collections.unmodifiableMap(Maps.transformValues(map, RangesAtEndpoint.Mutable::asImmutableView)));
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Replica.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replica.java b/src/java/org/apache/cassandra/locator/Replica.java
new file mode 100644
index 0000000..37b6050
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/Replica.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Objects;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.utils.FBUtilities;
+
+/**
+ * A Replica represents an owning node for a copy of a portion of the token ring.
+ *
+ * It consists of:
+ *  - the logical token range that is being replicated (i.e. for the first logical replica only, this will be equal
+ *      to one of its owned portions of the token ring; all other replicas will have this token range also)
+ *  - an endpoint (IP and port)
+ *  - whether the range is replicated in full, or transiently (CASSANDRA-14404)
+ *
+ * In general, it is preferred to use a Replica to a Range&lt;Token&gt;, particularly when users of the concept depend on
+ * knowledge of the full/transient status of the copy.
+ *
+ * That means you should avoid unwrapping and rewrapping these things and think hard about subtraction
+ * and such and what the result is WRT to transientness. Definitely avoid creating fake Replicas with misinformation
+ * about endpoints, ranges, or transientness.
+ */
+public final class Replica implements Comparable<Replica>
+{
+    private final Range<Token> range;
+    private final InetAddressAndPort endpoint;
+    private final boolean full;
+
+    public Replica(InetAddressAndPort endpoint, Range<Token> range, boolean full)
+    {
+        Preconditions.checkNotNull(endpoint);
+        Preconditions.checkNotNull(range);
+        this.endpoint = endpoint;
+        this.range = range;
+        this.full = full;
+    }
+
+    public Replica(InetAddressAndPort endpoint, Token start, Token end, boolean full)
+    {
+        this(endpoint, new Range<>(start, end), full);
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Replica replica = (Replica) o;
+        return full == replica.full &&
+               Objects.equals(endpoint, replica.endpoint) &&
+               Objects.equals(range, replica.range);
+    }
+
+    @Override
+    public int compareTo(Replica o)
+    {
+        int c = range.compareTo(o.range);
+        if (c == 0)
+            c = endpoint.compareTo(o.endpoint);
+        if (c == 0)
+            c =  Boolean.compare(full, o.full);
+        return c;
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(endpoint, range, full);
+    }
+
+    @Override
+    public String toString()
+    {
+        return (full ? "Full" : "Transient") + '(' + endpoint() + ',' + range + ')';
+    }
+
+    public final InetAddressAndPort endpoint()
+    {
+        return endpoint;
+    }
+
+    public boolean isLocal()
+    {
+        return endpoint.equals(FBUtilities.getBroadcastAddressAndPort());
+    }
+
+    public Range<Token> range()
+    {
+        return range;
+    }
+
+    public boolean isFull()
+    {
+        return full;
+    }
+
+    public final boolean isTransient()
+    {
+        return !isFull();
+    }
+
+    /**
+     * This is used exclusively in TokenMetadata to check if a portion of a range is already replicated
+     * by an endpoint so that we only mark as pending the portion that is either not replicated sufficiently (transient
+     * when we need full) or at all.
+     *
+     * If it's not replicated at all it needs to be pending because there is no data.
+     * If it's replicated but only transiently and we need to replicate it fully it must be marked as pending until it
+     * is available fully otherwise a read might treat this replica as full and not read from a full replica that has
+     * the data.
+     */
+    public RangesAtEndpoint subtractSameReplication(RangesAtEndpoint toSubtract)
+    {
+        Set<Range<Token>> subtractedRanges = range().subtractAll(toSubtract.filter(r -> r.isFull() == isFull()).ranges());
+        RangesAtEndpoint.Builder result = RangesAtEndpoint.builder(endpoint, subtractedRanges.size());
+        for (Range<Token> range : subtractedRanges)
+        {
+            result.add(decorateSubrange(range));
+        }
+        return result.build();
+    }
+
+    /**
+     * Don't use this method and ignore transient status unless you are explicitly handling it outside this method.
+     *
+     * This helper method is used by StorageService.calculateStreamAndFetchRanges to perform subtraction.
+     * It ignores transient status because it's already being handled in calculateStreamAndFetchRanges.
+     */
+    public RangesAtEndpoint subtractIgnoreTransientStatus(Range<Token> subtract)
+    {
+        Set<Range<Token>> ranges = this.range.subtract(subtract);
+        RangesAtEndpoint.Builder result = RangesAtEndpoint.builder(endpoint, ranges.size());
+        for (Range<Token> subrange : ranges)
+            result.add(decorateSubrange(subrange));
+        return result.build();
+    }
+
+    public boolean contains(Range<Token> that)
+    {
+        return range().contains(that);
+    }
+
+    public boolean intersectsOnRange(Replica replica)
+    {
+        return range().intersects(replica.range());
+    }
+
+    public Replica decorateSubrange(Range<Token> subrange)
+    {
+        Preconditions.checkArgument(range.contains(subrange));
+        return new Replica(endpoint(), subrange, isFull());
+    }
+
+    public static Replica fullReplica(InetAddressAndPort endpoint, Range<Token> range)
+    {
+        return new Replica(endpoint, range, true);
+    }
+
+    public static Replica fullReplica(InetAddressAndPort endpoint, Token start, Token end)
+    {
+        return fullReplica(endpoint, new Range<>(start, end));
+    }
+
+    public static Replica transientReplica(InetAddressAndPort endpoint, Range<Token> range)
+    {
+        return new Replica(endpoint, range, false);
+    }
+
+    public static Replica transientReplica(InetAddressAndPort endpoint, Token start, Token end)
+    {
+        return transientReplica(endpoint, new Range<>(start, end));
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaCollection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaCollection.java b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
new file mode 100644
index 0000000..6833f4b
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaCollection.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * A collection like class for Replica objects. Represents both a well defined order on the contained Replica objects,
+ * and efficient methods for accessing the contained Replicas, directly and as a projection onto their endpoints and ranges.
+ */
+public interface ReplicaCollection<C extends ReplicaCollection<C>> extends Iterable<Replica>
+{
+    /**
+     * @return a Set of the endpoints of the contained Replicas.
+     * Iteration order is maintained where there is a 1:1 relationship between endpoint and Replica
+     * Typically this collection offers O(1) access methods, and this is true for all but ReplicaList.
+     */
+    public abstract Set<InetAddressAndPort> endpoints();
+
+    /**
+     * @param i a value in the range [0..size())
+     * @return the i'th Replica, in our iteration order
+     */
+    public abstract Replica get(int i);
+
+    /**
+     * @return the number of Replica contained
+     */
+    public abstract int size();
+
+    /**
+     * @return true iff size() == 0
+     */
+    public abstract boolean isEmpty();
+
+    /**
+     * @return true iff a Replica in this collection is equal to the provided Replica.
+     * Typically this method is expected to take O(1) time, and this is true for all but ReplicaList.
+     */
+    public abstract boolean contains(Replica replica);
+
+    /**
+     * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate.
+     * An effort will be made to either return ourself, or a subList, where possible.
+     * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+     */
+    public abstract C filter(Predicate<Replica> predicate);
+
+    /**
+     * @return a *eagerly constructed* copy of this collection containing the Replica that match the provided predicate.
+     * An effort will be made to either return ourself, or a subList, where possible.
+     * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+     * Only the first maxSize items will be returned.
+     */
+    public abstract C filter(Predicate<Replica> predicate, int maxSize);
+
+    /**
+     * @return an *eagerly constructed* copy of this collection containing the Replica at positions [start..end);
+     * An effort will be made to either return ourself, or a subList, where possible.
+     * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+     */
+    public abstract C subList(int start, int end);
+
+    /**
+     * @return an *eagerly constructed* copy of this collection containing the Replica re-ordered according to this comparator
+     * It is guaranteed that no changes to any upstream Mutable will affect the state of the result.
+     */
+    public abstract C sorted(Comparator<Replica> comparator);
+
+    public abstract Iterator<Replica> iterator();
+    public abstract Stream<Replica> stream();
+
+    public abstract boolean equals(Object o);
+    public abstract int hashCode();
+    public abstract String toString();
+
+    /**
+     * A mutable extension of a ReplicaCollection.  This is append-only, so it is safe to select a subList,
+     * or at any time take an asImmutableView() snapshot.
+     */
+    public interface Mutable<C extends ReplicaCollection<C>> extends ReplicaCollection<C>
+    {
+        /**
+         * @return an Immutable clone that mirrors any modifications to this Mutable instance.
+         */
+        C asImmutableView();
+
+        /**
+         * @return an Immutable clone that assumes this Mutable will never be modified again.
+         * If this is not true, behaviour is undefined.
+         */
+        C asSnapshot();
+
+        enum Conflict { NONE, DUPLICATE, ALL}
+
+        /**
+         * @param replica add this replica to the end of the collection
+         * @param ignoreConflict if false, fail on any conflicting additions (as defined by C's semantics)
+         */
+        void add(Replica replica, Conflict ignoreConflict);
+
+        default public void add(Replica replica)
+        {
+            add(replica, Conflict.NONE);
+        }
+
+        default public void addAll(Iterable<Replica> replicas, Conflict ignoreConflicts)
+        {
+            for (Replica replica : replicas)
+                add(replica, ignoreConflicts);
+        }
+
+        default public void addAll(Iterable<Replica> replicas)
+        {
+            addAll(replicas, Conflict.NONE);
+        }
+    }
+
+    public static class Builder<C extends ReplicaCollection<C>, M extends Mutable<C>, B extends Builder<C, M, B>>
+    {
+        Mutable<C> mutable;
+        public Builder(Mutable<C> mutable) { this.mutable = mutable; }
+
+        public int size() { return mutable.size(); }
+        public B add(Replica replica) { mutable.add(replica); return (B) this; }
+        public B add(Replica replica, Conflict ignoreConflict) { mutable.add(replica, ignoreConflict); return (B) this; }
+        public B addAll(Iterable<Replica> replica) { mutable.addAll(replica); return (B) this; }
+        public B addAll(Iterable<Replica> replica, Conflict ignoreConflict) { mutable.addAll(replica, ignoreConflict); return (B) this; }
+
+        public C build()
+        {
+            C result = mutable.asSnapshot();
+            mutable = null;
+            return result;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaLayout.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaLayout.java b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
new file mode 100644
index 0000000..946a7f8
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaLayout.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Predicate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Predicates;
+import com.google.common.collect.Iterables;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.PartitionPosition;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.UnavailableException;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.net.IAsyncCallback;
+import org.apache.cassandra.service.StorageProxy;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy;
+import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static com.google.common.collect.Iterables.any;
+
+/**
+ * Encapsulates knowledge about the ring necessary for performing a specific operation, with static accessors
+ * for building the relevant layout.
+ *
+ * Constitutes:
+ *  - the 'natural' replicas replicating the range or token relevant for the operation
+ *  - if for performing a write, any 'pending' replicas that are taking ownership of the range, and must receive updates
+ *  - the 'selected' replicas, those that should be targeted for any operation
+ *  - 'all' replicas represents natural+pending
+ *
+ * @param <E> the type of Endpoints this ReplayLayout holds (either EndpointsForToken or EndpointsForRange)
+ * @param <L> the type of itself, including its type parameters, for return type of modifying methods
+ */
+public abstract class ReplicaLayout<E extends Endpoints<E>, L extends ReplicaLayout<E, L>>
+{
+    private volatile E all;
+    protected final E natural;
+    protected final E pending;
+    protected final E selected;
+
+    protected final Keyspace keyspace;
+    protected final ConsistencyLevel consistencyLevel;
+
+    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected)
+    {
+        this(keyspace, consistencyLevel, natural, pending, selected, null);
+    }
+
+    private ReplicaLayout(Keyspace keyspace, ConsistencyLevel consistencyLevel, E natural, E pending, E selected, E all)
+    {
+        assert selected != null;
+        assert pending == null || !Endpoints.haveConflicts(natural, pending);
+        this.keyspace = keyspace;
+        this.consistencyLevel = consistencyLevel;
+        this.natural = natural;
+        this.pending = pending;
+        this.selected = selected;
+        // if we logically have no pending endpoints (they are null), then 'all' our endpoints are natural
+        if (all == null && pending == null)
+            all = natural;
+        this.all = all;
+    }
+
+    public Replica getReplicaFor(InetAddressAndPort endpoint)
+    {
+        return natural.byEndpoint().get(endpoint);
+    }
+
+    public E natural()
+    {
+        return natural;
+    }
+
+    public E all()
+    {
+        E result = all;
+        if (result == null)
+            all = result = Endpoints.concat(natural, pending);
+        return result;
+    }
+
+    public E selected()
+    {
+        return selected;
+    }
+
+    /**
+     * @return the pending replicas - will be null for read layouts
+     * TODO: ideally we would enforce at compile time that read layouts have no pending to access
+     */
+    public E pending()
+    {
+        return pending;
+    }
+
+    public int blockFor()
+    {
+        return pending == null
+                ? consistencyLevel.blockFor(keyspace)
+                : consistencyLevel.blockForWrite(keyspace, pending);
+    }
+
+    public Keyspace keyspace()
+    {
+        return keyspace;
+    }
+
+    public ConsistencyLevel consistencyLevel()
+    {
+        return consistencyLevel;
+    }
+
+    abstract public L withSelected(E replicas);
+
+    abstract public L withConsistencyLevel(ConsistencyLevel cl);
+
+    public L forNaturalUncontacted()
+    {
+        E more;
+        if (consistencyLevel.isDatacenterLocal() && keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
+        {
+            IEndpointSnitch snitch = keyspace.getReplicationStrategy().snitch;
+            String localDC = DatabaseDescriptor.getLocalDataCenter();
+
+            more = natural.filter(replica -> !selected.contains(replica) &&
+                    snitch.getDatacenter(replica).equals(localDC));
+        } else
+        {
+            more = natural.filter(replica -> !selected.contains(replica));
+        }
+
+        return withSelected(more);
+    }
+
+    public static class ForRange extends ReplicaLayout<EndpointsForRange, ForRange>
+    {
+        public final AbstractBounds<PartitionPosition> range;
+
+        @VisibleForTesting
+        public ForRange(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+        {
+            // Range queries do not contact pending replicas
+            super(keyspace, consistencyLevel, natural, null, selected);
+            this.range = range;
+        }
+
+        @Override
+        public ForRange withSelected(EndpointsForRange newSelected)
+        {
+            return new ForRange(keyspace, consistencyLevel, range, natural, newSelected);
+        }
+
+        @Override
+        public ForRange withConsistencyLevel(ConsistencyLevel cl)
+        {
+            return new ForRange(keyspace, cl, range, natural, selected);
+        }
+    }
+
+    public static class ForToken extends ReplicaLayout<EndpointsForToken, ForToken>
+    {
+        public final Token token;
+
+        @VisibleForTesting
+        public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected)
+        {
+            super(keyspace, consistencyLevel, natural, pending, selected);
+            this.token = token;
+        }
+
+        public ForToken(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+        {
+            super(keyspace, consistencyLevel, natural, pending, selected, all);
+            this.token = token;
+        }
+
+        public ForToken withSelected(EndpointsForToken newSelected)
+        {
+            return new ForToken(keyspace, consistencyLevel, token, natural, pending, newSelected);
+        }
+
+        @Override
+        public ForToken withConsistencyLevel(ConsistencyLevel cl)
+        {
+            return new ForToken(keyspace, cl, token, natural, pending, selected);
+        }
+    }
+
+    public static class ForPaxos extends ForToken
+    {
+        private final int requiredParticipants;
+
+        private ForPaxos(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int requiredParticipants, EndpointsForToken natural, EndpointsForToken pending, EndpointsForToken selected, EndpointsForToken all)
+        {
+            super(keyspace, consistencyLevel, token, natural, pending, selected, all);
+            this.requiredParticipants = requiredParticipants;
+        }
+
+        public int getRequiredParticipants()
+        {
+            return requiredParticipants;
+        }
+    }
+
+    public static ForToken forSingleReplica(Keyspace keyspace, Token token, Replica replica)
+    {
+        EndpointsForToken singleReplica = EndpointsForToken.of(token, replica);
+        return new ForToken(keyspace, ConsistencyLevel.ONE, token, singleReplica, EndpointsForToken.empty(token), singleReplica, singleReplica);
+    }
+
+    public static ForRange forSingleReplica(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica)
+    {
+        EndpointsForRange singleReplica = EndpointsForRange.of(replica);
+        return new ForRange(keyspace, ConsistencyLevel.ONE, range, singleReplica, singleReplica);
+    }
+
+    public static ForToken forCounterWrite(Keyspace keyspace, Token token, Replica replica)
+    {
+        return forSingleReplica(keyspace, token, replica);
+    }
+
+    public static ForToken forBatchlogWrite(Keyspace keyspace, Collection<InetAddressAndPort> endpoints) throws UnavailableException
+    {
+        // A single case we write not for range or token, but multiple mutations to many tokens
+        Token token = DatabaseDescriptor.getPartitioner().getMinimumToken();
+        EndpointsForToken natural = EndpointsForToken.copyOf(token, SystemReplicas.getSystemReplicas(endpoints));
+        EndpointsForToken pending = EndpointsForToken.empty(token);
+        ConsistencyLevel consistencyLevel = natural.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO;
+
+        return forWriteWithDownNodes(keyspace, consistencyLevel, token, natural, pending);
+    }
+
+    public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, token, Predicates.alwaysTrue());
+    }
+
+    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+    {
+        EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), token);
+        EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(token, keyspace.getName());
+        return forWrite(keyspace, consistencyLevel, token, natural, pending, isAlive);
+    }
+
+    public static ForToken forWriteWithDownNodes(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending) throws UnavailableException
+    {
+        return forWrite(keyspace, consistencyLevel, token, natural, pending, Predicates.alwaysTrue());
+    }
+
+    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> isAlive) throws UnavailableException
+    {
+        if (Endpoints.haveConflicts(natural, pending))
+        {
+            natural = Endpoints.resolveConflictsInNatural(natural, pending);
+            pending = Endpoints.resolveConflictsInPending(natural, pending);
+        }
+
+        if (!any(natural, Replica::isTransient) && !any(pending, Replica::isTransient))
+        {
+            EndpointsForToken selected = Endpoints.concat(natural, pending).filter(r -> isAlive.test(r.endpoint()));
+            return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected);
+        }
+
+        return forWrite(keyspace, consistencyLevel, token, consistencyLevel.blockForWrite(keyspace, pending), natural, pending, isAlive);
+    }
+
+    public static ReplicaLayout.ForPaxos forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException
+    {
+        Token tk = key.getToken();
+        EndpointsForToken natural = StorageService.getNaturalReplicasForToken(keyspace.getName(), tk);
+        EndpointsForToken pending = StorageService.instance.getTokenMetadata().pendingEndpointsForToken(tk, keyspace.getName());
+        if (Endpoints.haveConflicts(natural, pending))
+        {
+            natural = Endpoints.resolveConflictsInNatural(natural, pending);
+            pending = Endpoints.resolveConflictsInPending(natural, pending);
+        }
+
+        // TODO CASSANDRA-14547
+        Replicas.temporaryAssertFull(natural);
+        Replicas.temporaryAssertFull(pending);
+
+        if (consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL)
+        {
+            // Restrict natural and pending to node in the local DC only
+            String localDc = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddressAndPort());
+            IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+            Predicate<Replica> isLocalDc = replica -> localDc.equals(snitch.getDatacenter(replica));
+
+            natural = natural.filter(isLocalDc);
+            pending = pending.filter(isLocalDc);
+        }
+
+        int participants = pending.size() + natural.size();
+        int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833
+
+        EndpointsForToken all = Endpoints.concat(natural, pending);
+        EndpointsForToken selected = all.filter(IAsyncCallback.isReplicaAlive);
+        if (selected.size() < requiredParticipants)
+            throw UnavailableException.create(consistencyForPaxos, requiredParticipants, selected.size());
+
+        // We cannot allow CAS operations with 2 or more pending endpoints, see #8346.
+        // Note that we fake an impossible number of required nodes in the unavailable exception
+        // to nail home the point that it's an impossible operation no matter how many nodes are live.
+        if (pending.size() > 1)
+            throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", pending.size()),
+                                           consistencyForPaxos,
+                                           participants + 1,
+                                           selected.size());
+
+        return new ReplicaLayout.ForPaxos(keyspace, consistencyForPaxos, key.getToken(), requiredParticipants, natural, pending, selected, all);
+    }
+
+    /**
+     * We want to send mutations to as many full replicas as we can, and just as many transient replicas
+     * as we need to meet blockFor.
+     */
+    @VisibleForTesting
+    public static ForToken forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, int blockFor, EndpointsForToken natural, EndpointsForToken pending, Predicate<InetAddressAndPort> livePredicate) throws UnavailableException
+    {
+        EndpointsForToken all = Endpoints.concat(natural, pending);
+        EndpointsForToken selected = all
+                .select()
+                .add(r -> r.isFull() && livePredicate.test(r.endpoint()))
+                .add(r -> r.isTransient() && livePredicate.test(r.endpoint()), blockFor)
+                .get();
+
+        consistencyLevel.assureSufficientLiveNodesForWrite(keyspace, selected, pending);
+
+        return new ForToken(keyspace, consistencyLevel, token, natural, pending, selected, all);
+    }
+
+    public static ForToken forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry)
+    {
+        EndpointsForToken natural = StorageProxy.getLiveSortedReplicasForToken(keyspace, token);
+        EndpointsForToken selected = consistencyLevel.filterForQuery(keyspace, natural, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE));
+
+        // Throw UAE early if we don't have enough replicas.
+        consistencyLevel.assureSufficientLiveNodesForRead(keyspace, selected);
+
+        return new ForToken(keyspace, consistencyLevel, token, natural, null, selected);
+    }
+
+    public static ForRange forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, EndpointsForRange natural, EndpointsForRange selected)
+    {
+        return new ForRange(keyspace, consistencyLevel, range, natural, selected);
+    }
+
+    public String toString()
+    {
+        return "ReplicaLayout [ CL: " + consistencyLevel + " keyspace: " + keyspace + " natural: " + natural + "pending: " + pending + " selected: " + selected + " ]";
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicaMultimap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicaMultimap.java b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java
new file mode 100644
index 0000000..3e3fcb4
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicaMultimap.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.AbstractMap;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+public abstract class ReplicaMultimap<K, C extends ReplicaCollection<?>>
+{
+    final Map<K, C> map;
+    ReplicaMultimap(Map<K, C> map)
+    {
+        this.map = map;
+    }
+
+    public abstract C get(K key);
+    public C getIfPresent(K key) { return map.get(key); }
+
+    public static abstract class Mutable
+            <K, MutableCollection extends ReplicaCollection.Mutable<?>>
+            extends ReplicaMultimap<K, MutableCollection>
+    {
+        protected abstract MutableCollection newMutable(K key);
+
+        Mutable()
+        {
+            super(new HashMap<>());
+        }
+
+        public MutableCollection get(K key)
+        {
+            Preconditions.checkNotNull(key);
+            return map.computeIfAbsent(key, k -> newMutable(key));
+        }
+
+        public void put(K key, Replica replica)
+        {
+            Preconditions.checkNotNull(key);
+            Preconditions.checkNotNull(replica);
+            get(key).add(replica);
+        }
+    }
+
+    public Iterable<Replica> flattenValues()
+    {
+        return Iterables.concat(map.values());
+    }
+
+    public Iterable<Map.Entry<K, Replica>> flattenEntries()
+    {
+        return () -> {
+            Stream<Map.Entry<K, Replica>> s = map.entrySet()
+                                                 .stream()
+                                                 .flatMap(entry -> entry.getValue()
+                                                                        .stream()
+                                                                        .map(replica -> (Map.Entry<K, Replica>)new AbstractMap.SimpleImmutableEntry<>(entry.getKey(), replica)));
+            return s.iterator();
+        };
+    }
+
+    public boolean isEmpty()
+    {
+        return map.isEmpty();
+    }
+
+    public boolean containsKey(Object key)
+    {
+        return map.containsKey(key);
+    }
+
+    public Set<K> keySet()
+    {
+        return map.keySet();
+    }
+
+    public Set<Map.Entry<K, C>> entrySet()
+    {
+        return map.entrySet();
+    }
+
+    public Map<K, C> asMap()
+    {
+        return map;
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ReplicaMultimap<?, ?> that = (ReplicaMultimap<?, ?>) o;
+        return Objects.equals(map, that.map);
+    }
+
+    public int hashCode()
+    {
+        return map.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+        return map.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Replicas.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/Replicas.java b/src/java/org/apache/cassandra/locator/Replicas.java
new file mode 100644
index 0000000..299e6ec
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/Replicas.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.collect.Iterables;
+
+import static com.google.common.collect.Iterables.all;
+
+public class Replicas
+{
+
+    public static int countFull(ReplicaCollection<?> liveReplicas)
+    {
+        int count = 0;
+        for (Replica replica : liveReplicas)
+            if (replica.isFull())
+                ++count;
+        return count;
+    }
+
+    /**
+     * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future
+     */
+    public static void temporaryAssertFull(Replica replica)
+    {
+        if (!replica.isFull())
+        {
+            throw new UnsupportedOperationException("transient replicas are currently unsupported: " + replica);
+        }
+    }
+
+    /**
+     * A placeholder for areas of the code that cannot yet handle transient replicas, but should do so in future
+     */
+    public static void temporaryAssertFull(Iterable<Replica> replicas)
+    {
+        if (!all(replicas, Replica::isFull))
+        {
+            throw new UnsupportedOperationException("transient replicas are currently unsupported: " + Iterables.toString(replicas));
+        }
+    }
+
+    /**
+     * For areas of the code that should never see a transient replica
+     */
+    public static void assertFull(Iterable<Replica> replicas)
+    {
+        if (!all(replicas, Replica::isFull))
+        {
+            throw new UnsupportedOperationException("transient replicas are currently unsupported: " + Iterables.toString(replicas));
+        }
+    }
+
+    public static List<String> stringify(ReplicaCollection<?> replicas, boolean withPort)
+    {
+        List<String> stringEndpoints = new ArrayList<>(replicas.size());
+        for (Replica replica: replicas)
+        {
+            stringEndpoints.add(replica.endpoint().getHostAddress(withPort));
+        }
+        return stringEndpoints;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/ReplicationFactor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/ReplicationFactor.java b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
new file mode 100644
index 0000000..c0ed31f
--- /dev/null
+++ b/src/java/org/apache/cassandra/locator/ReplicationFactor.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.locator;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicates;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ReplicationFactor
+{
+    public static final ReplicationFactor ZERO = new ReplicationFactor(0);
+
+    public final int allReplicas;
+    public final int fullReplicas;
+
+    private ReplicationFactor(int allReplicas, int transientReplicas)
+    {
+        validate(allReplicas, transientReplicas);
+        this.allReplicas = allReplicas;
+        this.fullReplicas = allReplicas - transientReplicas;
+    }
+
+    public int transientReplicas()
+    {
+        return allReplicas - fullReplicas;
+    }
+
+    public boolean hasTransientReplicas()
+    {
+        return allReplicas != fullReplicas;
+    }
+
+    private ReplicationFactor(int allReplicas)
+    {
+        this(allReplicas, 0);
+    }
+
+    static void validate(int totalRF, int transientRF)
+    {
+        Preconditions.checkArgument(transientRF == 0 || DatabaseDescriptor.isTransientReplicationEnabled(),
+                                    "Transient replication is not enabled on this node");
+        Preconditions.checkArgument(totalRF >= 0,
+                                    "Replication factor must be non-negative, found %s", totalRF);
+        Preconditions.checkArgument(transientRF == 0 || transientRF < totalRF,
+                                    "Transient replicas must be zero, or less than total replication factor. For %s/%s", totalRF, transientRF);
+        if (transientRF > 0)
+        {
+            Preconditions.checkArgument(DatabaseDescriptor.getNumTokens() == 1,
+                                        "Transient nodes are not allowed with multiple tokens");
+            Stream<InetAddressAndPort> endpoints = Stream.concat(Gossiper.instance.getLiveMembers().stream(), Gossiper.instance.getUnreachableMembers().stream());
+            List<InetAddressAndPort> badVersionEndpoints = endpoints.filter(Predicates.not(FBUtilities.getBroadcastAddressAndPort()::equals))
+                                                                    .filter(endpoint -> Gossiper.instance.getReleaseVersion(endpoint) != null && Gossiper.instance.getReleaseVersion(endpoint).major < 4)
+                                                                    .collect(Collectors.toList());
+            if (!badVersionEndpoints.isEmpty())
+                throw new AssertionError("Transient replication is not supported in mixed version clusters with nodes < 4.0. Bad nodes: " + badVersionEndpoints);
+        }
+        else if (transientRF < 0)
+        {
+            throw new AssertionError(String.format("Amount of transient nodes should be strictly positive, but was: '%d'", transientRF));
+        }
+    }
+
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        ReplicationFactor that = (ReplicationFactor) o;
+        return allReplicas == that.allReplicas && fullReplicas == that.fullReplicas;
+    }
+
+    public int hashCode()
+    {
+        return Objects.hash(allReplicas, fullReplicas);
+    }
+
+    public static ReplicationFactor fullOnly(int totalReplicas)
+    {
+        return new ReplicationFactor(totalReplicas);
+    }
+
+    public static ReplicationFactor withTransient(int totalReplicas, int transientReplicas)
+    {
+        return new ReplicationFactor(totalReplicas, transientReplicas);
+    }
+
+    public static ReplicationFactor fromString(String s)
+    {
+        if (s.contains("/"))
+        {
+            String[] parts = s.split("/");
+            Preconditions.checkArgument(parts.length == 2,
+                                        "Replication factor format is <replicas> or <replicas>/<transient>");
+            return new ReplicationFactor(Integer.valueOf(parts[0]), Integer.valueOf(parts[1]));
+        }
+        else
+        {
+            return new ReplicationFactor(Integer.valueOf(s), 0);
+        }
+    }
+
+    @Override
+    public String toString()
+    {
+        return "rf(" + allReplicas + (hasTransientReplicas() ? '/' + transientReplicas() : "") + ')';
+    }
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/SimpleSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleSnitch.java b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
index e31fc6b..d605b6e 100644
--- a/src/java/org/apache/cassandra/locator/SimpleSnitch.java
+++ b/src/java/org/apache/cassandra/locator/SimpleSnitch.java
@@ -17,8 +17,6 @@
  */
 package org.apache.cassandra.locator;
 
-import java.util.List;
-
 /**
  * A simple endpoint snitch implementation that treats Strategy order as proximity,
  * allowing non-read-repaired reads to prefer a single endpoint, which improves
@@ -37,12 +35,14 @@ public class SimpleSnitch extends AbstractEndpointSnitch
     }
 
     @Override
-    public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses)
+    public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddress)
     {
         // Optimization to avoid walking the list
+        return unsortedAddress;
     }
 
-    public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2)
+    @Override
+    public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2)
     {
         // Making all endpoints equal ensures we won't change the original ordering (since
         // Collections.sort is guaranteed to be stable)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org