You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/11/18 16:46:31 UTC
cassandra git commit: Improve NTS endpoints calculation
Repository: cassandra
Updated Branches:
refs/heads/trunk 29ec013c2 -> c000da135
Improve NTS endpoints calculation
patch by Branimir Lambov; reviewed by Aleksey Yeschenko for
CASSANDRA-10200
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c000da13
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c000da13
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c000da13
Branch: refs/heads/trunk
Commit: c000da13563907b99fe220a7c8bde3c1dec74ad5
Parents: 29ec013
Author: Branimir Lambov <br...@datastax.com>
Authored: Wed Aug 26 16:08:57 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 18 15:44:21 2015 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../locator/NetworkTopologyStrategy.java | 157 ++++++++------
.../apache/cassandra/locator/TokenMetadata.java | 21 +-
.../locator/NetworkTopologyStrategyTest.java | 213 ++++++++++++++++++-
4 files changed, 317 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 2710ed3..77034ef 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.2
+ * Improve NTS endpoints calculation (CASSANDRA-10200)
* Improve performance of the folderSize function (CASSANDRA-10677)
* Add support for type casting in selection clause (CASSANDRA-10310)
* Added graphing option to cassandra-stress (CASSANDRA-7918)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 307a07f..9f74dcc 100644
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata.Topology;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
import com.google.common.collect.Multimap;
@@ -48,14 +49,12 @@ import com.google.common.collect.Multimap;
*/
public class NetworkTopologyStrategy extends AbstractReplicationStrategy
{
- private final IEndpointSnitch snitch;
private final Map<String, Integer> datacenters;
private static final Logger logger = LoggerFactory.getLogger(NetworkTopologyStrategy.class);
public NetworkTopologyStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) throws ConfigurationException
{
super(keyspaceName, tokenMetadata, snitch, configOptions);
- this.snitch = snitch;
Map<String, Integer> newDatacenters = new HashMap<String, Integer>();
if (configOptions != null)
@@ -75,17 +74,78 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
}
/**
- * calculate endpoints in one pass through the tokens by tracking our progress in each DC, rack etc.
+ * Endpoint adder applying the replication rules for a given DC.
+ */
+ private static final class DatacenterEndpoints
+ {
+ /** List accepted endpoints get pushed into. */
+ Set<InetAddress> endpoints;
+ /**
+ * Racks encountered so far. Replicas are put into separate racks while possible.
+ * For efficiency the set is shared between the instances, using the location pair (dc, rack) to make sure
+ * clashing names aren't a problem.
+ */
+ Set<Pair<String, String>> racks;
+
+ /** Number of replicas left to fill from this DC. */
+ int rfLeft;
+ int acceptableRackRepeats;
+
+ DatacenterEndpoints(int rf, int rackCount, int nodeCount, Set<InetAddress> endpoints, Set<Pair<String, String>> racks)
+ {
+ this.endpoints = endpoints;
+ this.racks = racks;
+ // If there aren't enough nodes in this DC to fill the RF, the number of nodes is the effective RF.
+ this.rfLeft = Math.min(rf, nodeCount);
+ // If there aren't enough racks in this DC to fill the RF, we'll still use at least one node from each rack,
+ // and the difference is to be filled by the first encountered nodes.
+ acceptableRackRepeats = rf - rackCount;
+ }
+
+ /**
+ * Attempts to add an endpoint to the replicas for this datacenter, adding to the endpoints set if successful.
+ * Returns true if the endpoint was added, and this datacenter does not require further replicas.
+ */
+ boolean addEndpointAndCheckIfDone(InetAddress ep, Pair<String,String> location)
+ {
+ if (done())
+ return false;
+
+ if (racks.add(location))
+ {
+ // New rack.
+ --rfLeft;
+ boolean added = endpoints.add(ep);
+ assert added;
+ return done();
+ }
+ if (acceptableRackRepeats <= 0)
+ // There must be rfLeft distinct racks left, do not add any more rack repeats.
+ return false;
+ if (!endpoints.add(ep))
+ // Cannot repeat a node.
+ return false;
+ // Added a node that is from an already met rack to match RF when there aren't enough racks.
+ --acceptableRackRepeats;
+ --rfLeft;
+ return done();
+ }
+
+ boolean done()
+ {
+ assert rfLeft >= 0;
+ return rfLeft == 0;
+ }
+ }
+
+ /**
+ * calculate endpoints in one pass through the tokens by tracking our progress in each DC.
*/
- @SuppressWarnings("serial")
public List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata)
{
// we want to preserve insertion order so that the first added endpoint becomes primary
Set<InetAddress> replicas = new LinkedHashSet<>();
- // replicas we have found in each DC
- Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size());
- for (Map.Entry<String, Integer> dc : datacenters.entrySet())
- dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
+ Set<Pair<String, String>> seenRacks = new HashSet<>();
Topology topology = tokenMetadata.getTopology();
// all endpoints in each DC, so we can check when we have exhausted all the members of a DC
@@ -94,74 +154,45 @@ public class NetworkTopologyStrategy extends AbstractReplicationStrategy
Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
- // tracks the racks we have already placed replicas in
- Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
- for (Map.Entry<String, Integer> dc : datacenters.entrySet())
- seenRacks.put(dc.getKey(), new HashSet<String>());
+ int dcsToFill = 0;
+ Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2);
+
+ // Create a DatacenterEndpoints object for each non-empty DC.
+ for (Map.Entry<String, Integer> en : datacenters.entrySet())
+ {
+ String dc = en.getKey();
+ int rf = en.getValue();
+ int nodeCount = sizeOrZero(allEndpoints.get(dc));
+
+ if (rf <= 0 || nodeCount <= 0)
+ continue;
- // tracks the endpoints that we skipped over while looking for unique racks
- // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
- Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
- for (Map.Entry<String, Integer> dc : datacenters.entrySet())
- skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
+ DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks);
+ dcs.put(dc, dcEndpoints);
+ ++dcsToFill;
+ }
Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
- while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints))
+ while (dcsToFill > 0 && tokenIter.hasNext())
{
Token next = tokenIter.next();
InetAddress ep = tokenMetadata.getEndpoint(next);
- String dc = snitch.getDatacenter(ep);
- // have we already found all replicas for this dc?
- if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints))
- continue;
- // can we skip checking the rack?
- if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
- {
- dcReplicas.get(dc).add(ep);
- replicas.add(ep);
- }
- else
- {
- String rack = snitch.getRack(ep);
- // is this a new rack?
- if (seenRacks.get(dc).contains(rack))
- {
- skippedDcEndpoints.get(dc).add(ep);
- }
- else
- {
- dcReplicas.get(dc).add(ep);
- replicas.add(ep);
- seenRacks.get(dc).add(rack);
- // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
- if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
- {
- Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
- while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints))
- {
- InetAddress nextSkipped = skippedIt.next();
- dcReplicas.get(dc).add(nextSkipped);
- replicas.add(nextSkipped);
- }
- }
- }
- }
+ Pair<String, String> location = topology.getLocation(ep);
+ DatacenterEndpoints dcEndpoints = dcs.get(location.left);
+ if (dcEndpoints != null && dcEndpoints.addEndpointAndCheckIfDone(ep, location))
+ --dcsToFill;
}
-
- return new ArrayList<InetAddress>(replicas);
+ return new ArrayList<>(replicas);
}
- private boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+ private int sizeOrZero(Multimap<?, ?> collection)
{
- return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc));
+ return collection != null ? collection.asMap().size() : 0;
}
- private boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints)
+ private int sizeOrZero(Collection<?> collection)
{
- for (String dc : datacenters.keySet())
- if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints))
- return false;
- return true;
+ return collection != null ? collection.size() : 0;
}
public int getReplicationFactor()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index e65b53e..a3be9de 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -828,20 +828,20 @@ public class TokenMetadata
public Token getPredecessor(Token token)
{
- List tokens = sortedTokens();
+ List<Token> tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
// assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
if (index < 0) index = -index-1;
- return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
+ return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1);
}
public Token getSuccessor(Token token)
{
- List tokens = sortedTokens();
+ List<Token> tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
// assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
if (index < 0) return (Token) tokens.get(-index-1);
- return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
+ return (index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1);
}
/** @return a copy of the bootstrapping tokens map */
@@ -902,7 +902,7 @@ public class TokenMetadata
}
}
- public static int firstTokenIndex(final ArrayList ring, Token start, boolean insertMin)
+ public static int firstTokenIndex(final ArrayList<Token> ring, Token start, boolean insertMin)
{
assert ring.size() > 0;
// insert the minimum token (at index == -1) if we were asked to include it and it isn't a member of the ring
@@ -930,7 +930,7 @@ public class TokenMetadata
{
if (ring.isEmpty())
return includeMin ? Iterators.singletonIterator(start.getPartitioner().getMinimumToken())
- : Iterators.<Token>emptyIterator();
+ : Collections.emptyIterator();
final boolean insertMin = includeMin && !ring.get(0).isMinimum();
final int startIndex = firstTokenIndex(ring, start, insertMin);
@@ -1279,5 +1279,14 @@ public class TokenMetadata
{
return dcRacks;
}
+
+ /**
+ * @return The DC and rack of the given endpoint.
+ */
+ public Pair<String, String> getLocation(InetAddress addr)
+ {
+ return currentLocations.get(addr);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c000da13/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index bbfdd3b..3cba328 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -21,24 +21,26 @@ package org.apache.cassandra.locator;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
+
import org.junit.Assert;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.ConfigurationException;
+import org.apache.cassandra.locator.TokenMetadata.Topology;
+import org.apache.cassandra.service.StorageService;
public class NetworkTopologyStrategyTest
{
@@ -166,4 +168,203 @@ public class NetworkTopologyStrategyTest
InetAddress add1 = InetAddress.getByAddress(bytes);
metadata.updateNormalToken(token1, add1);
}
+
+ @Test
+ public void testCalculateEndpoints() throws UnknownHostException
+ {
+ final int NODES = 100;
+ final int VNODES = 64;
+ final int RUNS = 10;
+ StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ Map<String, Integer> datacenters = ImmutableMap.of("rf1", 1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5);
+ List<InetAddress> nodes = new ArrayList<>(NODES);
+ for (byte i=0; i<NODES; ++i)
+ nodes.add(InetAddress.getByAddress(new byte[]{127, 0, 0, i}));
+ for (int run=0; run<RUNS; ++run)
+ {
+ Random rand = new Random();
+ IEndpointSnitch snitch = generateSnitch(datacenters, nodes, rand);
+ DatabaseDescriptor.setEndpointSnitch(snitch);
+
+ TokenMetadata meta = new TokenMetadata();
+ for (int i=0; i<NODES; ++i) // Nodes
+ for (int j=0; j<VNODES; ++j) // tokens/vnodes per node
+ meta.updateNormalToken(Murmur3Partitioner.instance.getRandomToken(rand), nodes.get(i));
+ testEquivalence(meta, snitch, datacenters, rand);
+ }
+ }
+
+ void testEquivalence(TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, Integer> datacenters, Random rand)
+ {
+ NetworkTopologyStrategy nts = new NetworkTopologyStrategy("ks", tokenMetadata, snitch,
+ datacenters.entrySet().stream().
+ collect(Collectors.toMap(x -> x.getKey(), x -> Integer.toString(x.getValue()))));
+ for (int i=0; i<1000; ++i)
+ {
+ Token token = Murmur3Partitioner.instance.getRandomToken(rand);
+ List<InetAddress> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch);
+ List<InetAddress> actual = nts.calculateNaturalEndpoints(token, tokenMetadata);
+ if (endpointsDiffer(expected, actual))
+ {
+ System.err.println("Endpoints mismatch for token " + token);
+ System.err.println(" expected: " + expected);
+ System.err.println(" actual : " + actual);
+ Assert.assertEquals("Endpoints for token " + token + " mismatch.", expected, actual);
+ }
+ }
+ }
+
+ private boolean endpointsDiffer(List<InetAddress> ep1, List<InetAddress> ep2)
+ {
+ // Because the old algorithm does not put the nodes in the correct order in the case where more replicas
+ // are required than there are racks in a dc, we accept different order as long as the primary
+ // replica is the same.
+ if (ep1.equals(ep2))
+ return false;
+ if (!ep1.get(0).equals(ep2.get(0)))
+ return true;
+ Set<InetAddress> s1 = new HashSet<>(ep1);
+ Set<InetAddress> s2 = new HashSet<>(ep2);
+ return !s1.equals(s2);
+ }
+
+ IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, Collection<InetAddress> nodes, Random rand)
+ {
+ final Map<InetAddress, String> nodeToRack = new HashMap<>();
+ final Map<InetAddress, String> nodeToDC = new HashMap<>();
+ Map<String, List<String>> racksPerDC = new HashMap<>();
+ datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, rand)));
+ int rf = datacenters.values().stream().mapToInt(x -> x).sum();
+ String[] dcs = new String[rf];
+ int pos = 0;
+ for (Map.Entry<String, Integer> dce : datacenters.entrySet())
+ {
+ for (int i = 0; i < dce.getValue(); ++i)
+ dcs[pos++] = dce.getKey();
+ }
+
+ for (InetAddress node : nodes)
+ {
+ String dc = dcs[rand.nextInt(rf)];
+ List<String> racks = racksPerDC.get(dc);
+ String rack = racks.get(rand.nextInt(racks.size()));
+ nodeToRack.put(node, rack);
+ nodeToDC.put(node, dc);
+ }
+
+ return new AbstractNetworkTopologySnitch()
+ {
+ public String getRack(InetAddress endpoint)
+ {
+ return nodeToRack.get(endpoint);
+ }
+
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return nodeToDC.get(endpoint);
+ }
+ };
+ }
+
+ private List<String> randomRacks(int rf, Random rand)
+ {
+ int rc = rand.nextInt(rf * 3 - 1) + 1;
+ List<String> racks = new ArrayList<>(rc);
+ for (int i=0; i<rc; ++i)
+ racks.add(Integer.toString(i));
+ return racks;
+ }
+
+ // Copy of older endpoints calculation algorithm for comparison
+ public static List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, Map<String, Integer> datacenters, IEndpointSnitch snitch)
+ {
+ // we want to preserve insertion order so that the first added endpoint becomes primary
+ Set<InetAddress> replicas = new LinkedHashSet<>();
+ // replicas we have found in each DC
+ Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size());
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
+
+ Topology topology = tokenMetadata.getTopology();
+ // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
+ Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ // all racks in a DC so we can check when we have exhausted all racks in a DC
+ Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
+
+ // tracks the racks we have already placed replicas in
+ Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ seenRacks.put(dc.getKey(), new HashSet<String>());
+
+ // tracks the endpoints that we skipped over while looking for unique racks
+ // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
+ Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
+
+ Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+ while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints, datacenters))
+ {
+ Token next = tokenIter.next();
+ InetAddress ep = tokenMetadata.getEndpoint(next);
+ String dc = snitch.getDatacenter(ep);
+ // have we already found all replicas for this dc?
+ if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+ continue;
+ // can we skip checking the rack?
+ if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+ {
+ dcReplicas.get(dc).add(ep);
+ replicas.add(ep);
+ }
+ else
+ {
+ String rack = snitch.getRack(ep);
+ // is this a new rack?
+ if (seenRacks.get(dc).contains(rack))
+ {
+ skippedDcEndpoints.get(dc).add(ep);
+ }
+ else
+ {
+ dcReplicas.get(dc).add(ep);
+ replicas.add(ep);
+ seenRacks.get(dc).add(rack);
+ // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
+ if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+ {
+ Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
+ while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+ {
+ InetAddress nextSkipped = skippedIt.next();
+ dcReplicas.get(dc).add(nextSkipped);
+ replicas.add(nextSkipped);
+ }
+ }
+ }
+ }
+ }
+
+ return new ArrayList<InetAddress>(replicas);
+ }
+
+ private static boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
+ {
+ return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc, datacenters));
+ }
+
+ private static boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
+ {
+ for (String dc : datacenters.keySet())
+ if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+ return false;
+ return true;
+ }
+
+ public static int getReplicationFactor(String dc, Map<String, Integer> datacenters)
+ {
+ Integer replicas = datacenters.get(dc);
+ return replicas == null ? 0 : replicas;
+ }
}