You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/18 16:46:31 UTC

cassandra git commit: Improve NTS endpoints calculation

Repository: cassandra
Updated Branches:
  refs/heads/trunk 29ec013c2 -> c000da135


Improve NTS endpoints calculation

patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-10200


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c000da13
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c000da13
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c000da13

Branch: refs/heads/trunk
Commit: c000da13563907b99fe220a7c8bde3c1dec74ad5
Parents: 29ec013
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Aug 26 16:08:57 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 18 15:44:21 2015 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../locator/NetworkTopologyStrategy.java        | 157 ++++++++------
 .../apache/cassandra/locator/TokenMetadata.java |  21 +-
 .../locator/NetworkTopologyStrategyTest.java    | 213 ++++++++++++++++++-
 4 files changed, 317 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2710ed3..77034ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.2
+ * Improve NTS endpoints calculation (CASSANDRA-10200)
  * Improve performance of the folderSize function (CASSANDRA-10677)
  * Add support for type casting in selection clause (CASSANDRA-10310)
  * Added graphing option to cassandra-stress (CASSANDRA-7918)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 307a07f..9f74dcc 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.TokenMetadata.Topology;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 import com.google.common.collect.Multimap;
 
@@ -48,14 +49,12 @@ import com.google.common.collect.Multimap;
  */
 public class NetworkTopologyStrategy extends AbstractReplicationStrategy
 {
-    private final IEndpointSnitch snitch;
     private final Map<String, Integer> datacenters;
     private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class);
 
     public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
     {
         super(keyspaceName, tokenMetadata, snitch, configOptions);
-        this.snitch = snitch;
 
         Map<String, Integer> newDatacenters = new HashMap<String, Integer>();
         if (configOptions != null)
@@ -75,17 +74,78 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
     }
 
     /**
-     * calculate endpoints in one pass through the tokens by tracking our progress in each DC, rack etc.
+     * Endpoint adder applying the replication rules for a given DC.
+     */
+    private static final class DatacenterEndpoints
+    {
+        /** List accepted endpoints get pushed into. */
+        Set<InetAddress> endpoints;
+        /**
+         * Racks encountered so far. Replicas are put into separate racks while possible.
+         * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure
+         * clashing names aren't a problem.
+         */
+        Set<Pair<String, String>> racks;
+
+        /** Number of replicas left to fill from this DC. */
+        int rfLeft;
+        int acceptableRackRepeats;
+
+        DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddress> endpoints, Set<Pair<String, String>> racks)
+        {
+            this.endpoints = endpoints;
+            this.racks = racks;
+            // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF.
+            this.rfLeft = Math.min(rf, nodeCount);
+            // If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack,
+            // and the difference is to be filled by the first encountered nodes.
+            acceptableRackRepeats = rf - rackCount;
+        }
+
+        /**
+         * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful.
+         * Returns true if the endpoint was added, and this datacenter does not require further replicas.
+         */
+        boolean addEndpointAndCheckIfDone(InetAddress ep, Pair<String,String> location)
+        {
+            if (done())
+                return false;
+
+            if (racks.add(location))
+            {
+                // New rack.
+                --rfLeft;
+                boolean added = endpoints.add(ep);
+                assert added;
+                return done();
+            }
+            if (acceptableRackRepeats <= 0)
+                // There must be rfLeft distinct racks left, do not add any more rack repeats.
+                return false;
+            if (!endpoints.add(ep))
+                // Cannot repeat a node.
+                return false;
+            // Added a node that is from an already met rack to match RF when there aren't enough racks.
+            --acceptableRackRepeats;
+            --rfLeft;
+            return done();
+        }
+
+        boolean done()
+        {
+            assert rfLeft >= 0;
+            return rfLeft == 0;
+        }
+    }
+
+    /**
+     * calculate endpoints in one pass through the tokens by tracking our progress in each DC.
      */
-    @SuppressWarnings("serial")
     public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
     {
         // we want to preserve insertion order so that the first added endpoint becomes primary
         Set<InetAddress> replicas = new LinkedHashSet<>();
-        // replicas we have found in each DC
-        Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size());
-        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
-            dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
+        Set<Pair<String, String>> seenRacks = new HashSet<>();
 
         Topology topology = tokenMetadata.getTopology();
         // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
@@ -94,74 +154,45 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
         Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
         assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
 
-        // tracks the racks we have already placed replicas in
-        Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
-        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
-            seenRacks.put(dc.getKey(), new HashSet<String>());
+        int dcsToFill = 0;
+        Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2);
+
+        // Create a DatacenterEndpoints object for each non-empty DC.
+        for (Map.Entry<String, Integer> en : datacenters.entrySet())
+        {
+            String dc = en.getKey();
+            int rf = en.getValue();
+            int nodeCount = sizeOrZero(allEndpoints.get(dc));
+
+            if (rf <= 0 || nodeCount <= 0)
+                continue;
 
-        // tracks the endpoints that we skipped over while looking for unique racks
-        // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
-        Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
-        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
-            skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
+            DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks);
+            dcs.put(dc, dcEndpoints);
+            ++dcsToFill;
+        }
 
         Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
-        while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints))
+        while (dcsToFill > 0 && tokenIter.hasNext())
         {
             Token next = tokenIter.next();
             InetAddress ep = tokenMetadata.getEndpoint(next);
-            String dc = snitch.getDatacenter(ep);
-            // have we already found all replicas for this dc?
-            if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints))
-                continue;
-            // can we skip checking the rack?
-            if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
-            {
-                dcReplicas.get(dc).add(ep);
-                replicas.add(ep);
-            }
-            else
-            {
-                String rack = snitch.getRack(ep);
-                // is this a new rack?
-                if (seenRacks.get(dc).contains(rack))
-                {
-                    skippedDcEndpoints.get(dc).add(ep);
-                }
-                else
-                {
-                    dcReplicas.get(dc).add(ep);
-                    replicas.add(ep);
-                    seenRacks.get(dc).add(rack);
-                    // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
-                    if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
-                    {
-                        Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
-                        while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints))
-                        {
-                            InetAddress nextSkipped = skippedIt.next();
-                            dcReplicas.get(dc).add(nextSkipped);
-                            replicas.add(nextSkipped);
-                        }
-                    }
-                }
-            }
+            Pair<String, String> location = topology.getLocation(ep);
+            DatacenterEndpoints dcEndpoints = dcs.get(location.left);
+            if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location))
+                --dcsToFill;
         }
-
-        return new ArrayList<InetAddress>(replicas);
+        return new ArrayList<>(replicas);
     }
 
-    private boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+    private int sizeOrZero(Multimap<?, ?> collection)
     {
-        return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc));
+        return collection != null ? collection.asMap().size() : 0;
     }
 
-    private boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+    private int sizeOrZero(Collection<?> collection)
     {
-        for (String dc : datacenters.keySet())
-            if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints))
-                return false;
-        return true;
+        return collection != null ? collection.size() : 0;
     }
 
     public int getReplicationFactor()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index e65b53e..a3be9de 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -828,20 +828,20 @@ public class TokenMetadata
 
     public Token getPredecessor(Token token)
     {
-        List tokens = sortedTokens();
+        List<Token> tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
 //        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
         if (index < 0) index = -index-1;
-        return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
+        return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1);
     }
 
     public Token getSuccessor(Token token)
     {
-        List tokens = sortedTokens();
+        List<Token> tokens = sortedTokens();
         int index = Collections.binarySearch(tokens, token);
 //        assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
         if (index < 0) return (Token) tokens.get(-index-1);
-        return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
+        return (index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1);
     }
 
     /** @return a copy of the bootstrapping tokens map */
@@ -902,7 +902,7 @@ public class TokenMetadata
         }
     }
 
-    public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
+    public static int firstTokenIndex(final ArrayList<Token> ring, Token start, boolean insertMin)
     {
         assert ring.size() > 0;
         // insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring
@@ -930,7 +930,7 @@ public class TokenMetadata
     {
         if (ring.isEmpty())
             return includeMin ? Iterators.singletonIterator(start.getPartitioner().getMinimumToken())
-                              : Iterators.<Token>emptyIterator();
+                              : Collections.emptyIterator();
 
         final boolean insertMin = includeMin && !ring.get(0).isMinimum();
         final int startIndex = firstTokenIndex(ring, start, insertMin);
@@ -1279,5 +1279,14 @@ public class TokenMetadata
         {
             return dcRacks;
         }
+
+        /**
+         * @return The DC and rack of the given endpoint.
+         */
+        public Pair<String, String> getLocation(InetAddress addr)
+        {
+            return currentLocations.get(addr);
+        }
+
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index bbfdd3b..3cba328 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -21,24 +21,26 @@ package org.apache.cassandra.locator;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
+
 import org.junit.Assert;
 import org.junit.Test;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.service.StorageService;
 
 public class NetworkTopologyStrategyTest
 {
@@ -166,4 +168,203 @@ public class NetworkTopologyStrategyTest
         InetAddress add1 = InetAddress.getByAddress(bytes);
         metadata.updateNormalToken(token1, add1);
     }
+
+    @Test
+    public void testCalculateEndpoints() throws UnknownHostException
+    {
+        final int NODES = 100;
+        final int VNODES = 64;
+        final int RUNS = 10;
+        StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+        Map<String, Integer> datacenters = ImmutableMap.of("rf1", 1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5);
+        List<InetAddress> nodes = new ArrayList<>(NODES);
+        for (byte i=0; i<NODES; ++i)
+            nodes.add(InetAddress.getByAddress(new byte[]{127, 0, 0, i}));
+        for (int run=0; run<RUNS; ++run)
+        {
+            Random rand = new Random();
+            IEndpointSnitch snitch = generateSnitch(datacenters, nodes, rand);
+            DatabaseDescriptor.setEndpointSnitch(snitch);
+
+            TokenMetadata meta = new TokenMetadata();
+            for (int i=0; i<NODES; ++i)  // Nodes
+                for (int j=0; j<VNODES; ++j) // tokens/vnodes per node
+                    meta.updateNormalToken(Murmur3Partitioner.instance.getRandomToken(rand), nodes.get(i));
+            testEquivalence(meta, snitch, datacenters, rand);
+        }
+    }
+
+    void testEquivalence(TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, Integer> datacenters, Random rand)
+    {
+        NetworkTopologyStrategy nts = new NetworkTopologyStrategy("ks", tokenMetadata, snitch,
+                                                                  datacenters.entrySet().stream().
+                                                                      collect(Collectors.toMap(x -> x.getKey(), x -> Integer.toString(x.getValue()))));
+        for (int i=0; i<1000; ++i)
+        {
+            Token token = Murmur3Partitioner.instance.getRandomToken(rand);
+            List<InetAddress> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch);
+            List<InetAddress> actual = nts.calculateNaturalEndpoints(token, tokenMetadata);
+            if (endpointsDiffer(expected, actual))
+            {
+                System.err.println("Endpoints mismatch for token " + token);
+                System.err.println(" expected: " + expected);
+                System.err.println(" actual  : " + actual);
+                Assert.assertEquals("Endpoints for token " + token + " mismatch.", expected, actual);
+            }
+        }
+    }
+
+    private boolean endpointsDiffer(List<InetAddress> ep1, List<InetAddress> ep2)
+    {
+        // Because the old algorithm does not put the nodes in the correct order in the case where more replicas
+        // are required than there are racks in a dc, we accept different order as long as the primary
+        // replica is the same.
+        if (ep1.equals(ep2))
+            return false;
+        if (!ep1.get(0).equals(ep2.get(0)))
+            return true;
+        Set<InetAddress> s1 = new HashSet<>(ep1);
+        Set<InetAddress> s2 = new HashSet<>(ep2);
+        return !s1.equals(s2);
+    }
+
+    IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, Collection<InetAddress> nodes, Random rand)
+    {
+        final Map<InetAddress, String> nodeToRack = new HashMap<>();
+        final Map<InetAddress, String> nodeToDC = new HashMap<>();
+        Map<String, List<String>> racksPerDC = new HashMap<>();
+        datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, rand)));
+        int rf = datacenters.values().stream().mapToInt(x -> x).sum();
+        String[] dcs = new String[rf];
+        int pos = 0;
+        for (Map.Entry<String, Integer> dce : datacenters.entrySet())
+        {
+            for (int i = 0; i < dce.getValue(); ++i)
+                dcs[pos++] = dce.getKey();
+        }
+
+        for (InetAddress node : nodes)
+        {
+            String dc = dcs[rand.nextInt(rf)];
+            List<String> racks = racksPerDC.get(dc);
+            String rack = racks.get(rand.nextInt(racks.size()));
+            nodeToRack.put(node, rack);
+            nodeToDC.put(node, dc);
+        }
+
+        return new AbstractNetworkTopologySnitch()
+        {
+            public String getRack(InetAddress endpoint)
+            {
+                return nodeToRack.get(endpoint);
+            }
+
+            public String getDatacenter(InetAddress endpoint)
+            {
+                return nodeToDC.get(endpoint);
+            }
+        };
+    }
+
+    private List<String> randomRacks(int rf, Random rand)
+    {
+        int rc = rand.nextInt(rf * 3 - 1) + 1;
+        List<String> racks = new ArrayList<>(rc);
+        for (int i=0; i<rc; ++i)
+            racks.add(Integer.toString(i));
+        return racks;
+    }
+
+    // Copy of older endpoints calculation algorithm for comparison
+    public static List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, Map<String, Integer> datacenters, IEndpointSnitch snitch)
+    {
+        // we want to preserve insertion order so that the first added endpoint becomes primary
+        Set<InetAddress> replicas = new LinkedHashSet<>();
+        // replicas we have found in each DC
+        Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size());
+        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+            dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
+
+        Topology topology = tokenMetadata.getTopology();
+        // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
+        Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+        // all racks in a DC so we can check when we have exhausted all racks in a DC
+        Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+        assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
+
+        // tracks the racks we have already placed replicas in
+        Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
+        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+            seenRacks.put(dc.getKey(), new HashSet<String>());
+
+        // tracks the endpoints that we skipped over while looking for unique racks
+        // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
+        Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
+        for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+            skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
+
+        Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+        while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints, datacenters))
+        {
+            Token next = tokenIter.next();
+            InetAddress ep = tokenMetadata.getEndpoint(next);
+            String dc = snitch.getDatacenter(ep);
+            // have we already found all replicas for this dc?
+            if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+                continue;
+            // can we skip checking the rack?
+            if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+            {
+                dcReplicas.get(dc).add(ep);
+                replicas.add(ep);
+            }
+            else
+            {
+                String rack = snitch.getRack(ep);
+                // is this a new rack?
+                if (seenRacks.get(dc).contains(rack))
+                {
+                    skippedDcEndpoints.get(dc).add(ep);
+                }
+                else
+                {
+                    dcReplicas.get(dc).add(ep);
+                    replicas.add(ep);
+                    seenRacks.get(dc).add(rack);
+                    // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
+                    if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+                    {
+                        Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
+                        while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+                        {
+                            InetAddress nextSkipped = skippedIt.next();
+                            dcReplicas.get(dc).add(nextSkipped);
+                            replicas.add(nextSkipped);
+                        }
+                    }
+                }
+            }
+        }
+
+        return new ArrayList<InetAddress>(replicas);
+    }
+
+    private static boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
+    {
+        return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc, datacenters));
+    }
+
+    private static boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
+    {
+        for (String dc : datacenters.keySet())
+            if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+                return false;
+        return true;
+    }
+
+    public static int getReplicationFactor(String dc, Map<String, Integer> datacenters)
+    {
+        Integer replicas = datacenters.get(dc);
+        return replicas == null ? 0 : replicas;
+    }
 }