You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2019/02/01 11:29:38 UTC
[cassandra] 01/01: Merge branch cassandra-3.0 into cassandra-3.11
This is an automated email from the ASF dual-hosted git repository.
snazy pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 16c96c20dadfbda98d4b5daf7f6c169b691459b9
Merge: 2d05bff bc18b4d
Author: Robert Stupp <sn...@snazy.de>
AuthorDate: Fri Feb 1 12:26:43 2019 +0100
Merge branch cassandra-3.0 into cassandra-3.11
.../org/apache/cassandra/db/ColumnFamilyStore.java | 1 -
.../db/compaction/AbstractCompactionStrategy.java | 14 +-
.../compaction/DateTieredCompactionStrategy.java | 20 +-
.../compaction/SizeTieredCompactionStrategy.java | 6 +-
.../compaction/TimeWindowCompactionStrategy.java | 4 +-
.../apache/cassandra/db/marshal/TypeParser.java | 30 +-
.../cassandra/locator/NetworkTopologyStrategy.java | 3 +-
.../apache/cassandra/locator/TokenMetadata.java | 364 ++++++++++++---------
.../locator/NetworkTopologyStrategyTest.java | 3 +-
.../cassandra/locator/TokenMetadataTest.java | 23 +-
10 files changed, 280 insertions(+), 188 deletions(-)
diff --cc src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
index bb9f4b9,7c38fa8..83d77a9
--- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java
@@@ -105,10 -89,10 +105,14 @@@ public class DateTieredCompactionStrate
*/
private synchronized List<SSTableReader> getNextBackgroundSSTables(final int gcBefore)
{
- if (sstables.isEmpty())
- if (Iterables.isEmpty(cfs.getSSTables(SSTableSet.LIVE)))
-- return Collections.emptyList();
++ Set<SSTableReader> uncompacting;
++ synchronized (sstables)
++ {
++ if (sstables.isEmpty())
++ return Collections.emptyList();
-- Set<SSTableReader> uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
++ uncompacting = ImmutableSet.copyOf(filter(cfs.getUncompactingSSTables(), sstables::contains));
++ }
Set<SSTableReader> expired = Collections.emptySet();
// we only check for expired sstables every 10 minutes (by default) due to it being an expensive operation
diff --cc src/java/org/apache/cassandra/db/marshal/TypeParser.java
index 78af800,590eea3..7416d49
--- a/src/java/org/apache/cassandra/db/marshal/TypeParser.java
+++ b/src/java/org/apache/cassandra/db/marshal/TypeParser.java
@@@ -23,7 -23,9 +23,10 @@@ import java.lang.reflect.Method
import java.nio.ByteBuffer;
import java.util.*;
+ import com.google.common.base.Verify;
+ import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
diff --cc src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
index 756b689,82183bb..d48dec3
--- a/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
+++ b/src/java/org/apache/cassandra/locator/NetworkTopologyStrategy.java
@@@ -28,8 -28,8 +28,9 @@@ import org.apache.cassandra.exceptions.
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.locator.TokenMetadata.Topology;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
+ import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
/**
@@@ -151,29 -92,22 +152,29 @@@ public class NetworkTopologyStrategy ex
// all endpoints in each DC, so we can check when we have exhausted all the members of a DC
Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
// all racks in a DC so we can check when we have exhausted all racks in a DC
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
- // tracks the racks we have already placed replicas in
- Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
- for (Map.Entry<String, Integer> dc : datacenters.entrySet())
- seenRacks.put(dc.getKey(), new HashSet<String>());
+ int dcsToFill = 0;
+ Map<String, DatacenterEndpoints> dcs = new HashMap<>(datacenters.size() * 2);
- // tracks the endpoints that we skipped over while looking for unique racks
- // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
- Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
- for (Map.Entry<String, Integer> dc : datacenters.entrySet())
- skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
+ // Create a DatacenterEndpoints object for each non-empty DC.
+ for (Map.Entry<String, Integer> en : datacenters.entrySet())
+ {
+ String dc = en.getKey();
+ int rf = en.getValue();
+ int nodeCount = sizeOrZero(allEndpoints.get(dc));
+
+ if (rf <= 0 || nodeCount <= 0)
+ continue;
+
+ DatacenterEndpoints dcEndpoints = new DatacenterEndpoints(rf, sizeOrZero(racks.get(dc)), nodeCount, replicas, seenRacks);
+ dcs.put(dc, dcEndpoints);
+ ++dcsToFill;
+ }
Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
- while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints))
+ while (dcsToFill > 0 && tokenIter.hasNext())
{
Token next = tokenIter.next();
InetAddress ep = tokenMetadata.getEndpoint(next);
diff --cc src/java/org/apache/cassandra/locator/TokenMetadata.java
index 8712916,3978eeb..8a1d9d0
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@@ -958,20 -975,33 +975,33 @@@ public class TokenMetadat
public Token getPredecessor(Token token)
{
- List tokens = sortedTokens();
+ List<Token> tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
- assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+ assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
- return (Token) (index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1));
+ return index == 0 ? tokens.get(tokens.size() - 1) : tokens.get(index - 1);
}
public Token getSuccessor(Token token)
{
- List tokens = sortedTokens();
+ List<Token> tokens = sortedTokens();
int index = Collections.binarySearch(tokens, token);
- assert index >= 0 : token + " not found in " + StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+ assert index >= 0 : token + " not found in " + tokenToEndpointMapKeysAsStrings();
- return (Token) ((index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1));
+ return (index == (tokens.size() - 1)) ? tokens.get(0) : tokens.get(index + 1);
}
+ private String tokenToEndpointMapKeysAsStrings()
+ {
+ lock.readLock().lock();
+ try
+ {
+ return StringUtils.join(tokenToEndpointMap.keySet(), ", ");
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
/** @return a copy of the bootstrapping tokens map */
public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
{
@@@ -1293,118 -1328,143 +1329,150 @@@
}
/**
- * construct deep-copy of other
+ * @return map of DC to multi-map of rack to endpoints in that rack
*/
- Topology(Topology other)
+ public ImmutableMap<String, ImmutableMultimap<String, InetAddress>> getDatacenterRacks()
{
- dcEndpoints = HashMultimap.create(other.dcEndpoints);
- dcRacks = new HashMap<>();
- for (String dc : other.dcRacks.keySet())
- dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
- currentLocations = new HashMap<>(other.currentLocations);
+ return dcRacks;
}
+ /**
- * Stores current DC/rack assignment for ep
++ * @return The DC and rack of the given endpoint.
+ */
- void addEndpoint(InetAddress ep)
++ public Pair<String, String> getLocation(InetAddress addr)
+ {
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- String dc = snitch.getDatacenter(ep);
- String rack = snitch.getRack(ep);
- Pair<String, String> current = currentLocations.get(ep);
- if (current != null)
- {
- if (current.left.equals(dc) && current.right.equals(rack))
- return;
- doRemoveEndpoint(ep, current);
- }
-
- doAddEndpoint(ep, dc, rack);
++ return currentLocations.get(addr);
+ }
+
- private void doAddEndpoint(InetAddress ep, String dc, String rack)
+ Builder unbuild()
{
- dcEndpoints.put(dc, ep);
-
- if (!dcRacks.containsKey(dc))
- dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
- dcRacks.get(dc).put(rack, ep);
-
- currentLocations.put(ep, Pair.create(dc, rack));
+ return new Builder(this);
}
- /**
- * Removes current DC/rack assignment for ep
- */
- void removeEndpoint(InetAddress ep)
+ static Builder builder()
{
- if (!currentLocations.containsKey(ep))
- return;
-
- doRemoveEndpoint(ep, currentLocations.remove(ep));
+ return new Builder();
}
- private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+ static Topology empty()
{
- dcRacks.get(current.left).remove(current.right, ep);
- dcEndpoints.remove(current.left, ep);
+ return builder().build();
}
- void updateEndpoint(InetAddress ep)
+ private static class Builder
{
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- if (snitch == null || !currentLocations.containsKey(ep))
- return;
+ /** multi-map of DC to endpoints in that DC */
+ private final Multimap<String, InetAddress> dcEndpoints;
+ /** map of DC to multi-map of rack to endpoints in that rack */
+ private final Map<String, Multimap<String, InetAddress>> dcRacks;
+ /** reverse-lookup map for endpoint to current known dc/rack assignment */
+ private final Map<InetAddress, Pair<String, String>> currentLocations;
- updateEndpoint(ep, snitch);
- }
+ Builder()
+ {
+ this.dcEndpoints = HashMultimap.create();
+ this.dcRacks = new HashMap<>();
+ this.currentLocations = new HashMap<>();
+ }
- void updateEndpoints()
- {
- IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
- if (snitch == null)
- return;
+ Builder(Topology from)
+ {
+ this.dcEndpoints = HashMultimap.create(from.dcEndpoints);
+
+ this.dcRacks = Maps.newHashMapWithExpectedSize(from.dcRacks.size());
+ for (Map.Entry<String, ImmutableMultimap<String, InetAddress>> entry : from.dcRacks.entrySet())
+ dcRacks.put(entry.getKey(), HashMultimap.create(entry.getValue()));
+
+ this.currentLocations = new HashMap<>(from.currentLocations);
+ }
+
+ /**
+ * Stores current DC/rack assignment for ep
+ */
+ Builder addEndpoint(InetAddress ep)
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ String dc = snitch.getDatacenter(ep);
+ String rack = snitch.getRack(ep);
+ Pair<String, String> current = currentLocations.get(ep);
+ if (current != null)
+ {
+ if (current.left.equals(dc) && current.right.equals(rack))
+ return this;
+ doRemoveEndpoint(ep, current);
+ }
+
+ doAddEndpoint(ep, dc, rack);
+ return this;
+ }
+
+ private void doAddEndpoint(InetAddress ep, String dc, String rack)
+ {
+ dcEndpoints.put(dc, ep);
+
+ if (!dcRacks.containsKey(dc))
+ dcRacks.put(dc, HashMultimap.<String, InetAddress>create());
+ dcRacks.get(dc).put(rack, ep);
+
+ currentLocations.put(ep, Pair.create(dc, rack));
+ }
+
+ /**
+ * Removes current DC/rack assignment for ep
+ */
+ Builder removeEndpoint(InetAddress ep)
+ {
+ if (!currentLocations.containsKey(ep))
+ return this;
+
+ doRemoveEndpoint(ep, currentLocations.remove(ep));
+ return this;
+ }
+
+ private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+ {
+ dcRacks.get(current.left).remove(current.right, ep);
+ dcEndpoints.remove(current.left, ep);
+ }
+
+ Builder updateEndpoint(InetAddress ep)
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ if (snitch == null || !currentLocations.containsKey(ep))
+ return this;
- for (InetAddress ep : currentLocations.keySet())
updateEndpoint(ep, snitch);
- }
+ return this;
+ }
- private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
- {
- Pair<String, String> current = currentLocations.get(ep);
- String dc = snitch.getDatacenter(ep);
- String rack = snitch.getRack(ep);
- if (dc.equals(current.left) && rack.equals(current.right))
- return;
+ Builder updateEndpoints()
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ if (snitch == null)
+ return this;
- doRemoveEndpoint(ep, current);
- doAddEndpoint(ep, dc, rack);
- }
+ for (InetAddress ep : currentLocations.keySet())
+ updateEndpoint(ep, snitch);
- /**
- * @return multi-map of DC to endpoints in that DC
- */
- public Multimap<String, InetAddress> getDatacenterEndpoints()
- {
- return dcEndpoints;
- }
+ return this;
+ }
- /**
- * @return map of DC to multi-map of rack to endpoints in that rack
- */
- public Map<String, Multimap<String, InetAddress>> getDatacenterRacks()
- {
- return dcRacks;
- }
+ private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+ {
+ Pair<String, String> current = currentLocations.get(ep);
+ String dc = snitch.getDatacenter(ep);
+ String rack = snitch.getRack(ep);
+ if (dc.equals(current.left) && rack.equals(current.right))
+ return;
- /**
- * @return The DC and rack of the given endpoint.
- */
- public Pair<String, String> getLocation(InetAddress addr)
- {
- return currentLocations.get(addr);
- }
+ doRemoveEndpoint(ep, current);
+ doAddEndpoint(ep, dc, rack);
+ }
+ Topology build()
+ {
+ return new Topology(this);
+ }
+ }
-
}
}
diff --cc test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index f64b84a,bbfdd3b..48dd573
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@@ -21,17 -21,17 +21,18 @@@ package org.apache.cassandra.locator
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
+import java.util.stream.Collectors;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableMap;
++import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Multimap;
+
import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Test;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -175,203 -166,4 +176,203 @@@ public class NetworkTopologyStrategyTes
InetAddress add1 = InetAddress.getByAddress(bytes);
metadata.updateNormalToken(token1, add1);
}
+
+ @Test
+ public void testCalculateEndpoints() throws UnknownHostException
+ {
+ final int NODES = 100;
+ final int VNODES = 64;
+ final int RUNS = 10;
+ StorageService.instance.setPartitionerUnsafe(Murmur3Partitioner.instance);
+ Map<String, Integer> datacenters = ImmutableMap.of("rf1", 1, "rf3", 3, "rf5_1", 5, "rf5_2", 5, "rf5_3", 5);
+ List<InetAddress> nodes = new ArrayList<>(NODES);
+ for (byte i=0; i<NODES; ++i)
+ nodes.add(InetAddress.getByAddress(new byte[]{127, 0, 0, i}));
+ for (int run=0; run<RUNS; ++run)
+ {
+ Random rand = new Random();
+ IEndpointSnitch snitch = generateSnitch(datacenters, nodes, rand);
+ DatabaseDescriptor.setEndpointSnitch(snitch);
+
+ TokenMetadata meta = new TokenMetadata();
+ for (int i=0; i<NODES; ++i) // Nodes
+ for (int j=0; j<VNODES; ++j) // tokens/vnodes per node
+ meta.updateNormalToken(Murmur3Partitioner.instance.getRandomToken(rand), nodes.get(i));
+ testEquivalence(meta, snitch, datacenters, rand);
+ }
+ }
+
+ void testEquivalence(TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, Integer> datacenters, Random rand)
+ {
+ NetworkTopologyStrategy nts = new NetworkTopologyStrategy("ks", tokenMetadata, snitch,
+ datacenters.entrySet().stream().
+ collect(Collectors.toMap(x -> x.getKey(), x -> Integer.toString(x.getValue()))));
+ for (int i=0; i<1000; ++i)
+ {
+ Token token = Murmur3Partitioner.instance.getRandomToken(rand);
+ List<InetAddress> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch);
+ List<InetAddress> actual = nts.calculateNaturalEndpoints(token, tokenMetadata);
+ if (endpointsDiffer(expected, actual))
+ {
+ System.err.println("Endpoints mismatch for token " + token);
+ System.err.println(" expected: " + expected);
+ System.err.println(" actual : " + actual);
+ Assert.assertEquals("Endpoints for token " + token + " mismatch.", expected, actual);
+ }
+ }
+ }
+
+ private boolean endpointsDiffer(List<InetAddress> ep1, List<InetAddress> ep2)
+ {
+ // Because the old algorithm does not put the nodes in the correct order in the case where more replicas
+ // are required than there are racks in a dc, we accept different order as long as the primary
+ // replica is the same.
+ if (ep1.equals(ep2))
+ return false;
+ if (!ep1.get(0).equals(ep2.get(0)))
+ return true;
+ Set<InetAddress> s1 = new HashSet<>(ep1);
+ Set<InetAddress> s2 = new HashSet<>(ep2);
+ return !s1.equals(s2);
+ }
+
+ IEndpointSnitch generateSnitch(Map<String, Integer> datacenters, Collection<InetAddress> nodes, Random rand)
+ {
+ final Map<InetAddress, String> nodeToRack = new HashMap<>();
+ final Map<InetAddress, String> nodeToDC = new HashMap<>();
+ Map<String, List<String>> racksPerDC = new HashMap<>();
+ datacenters.forEach((dc, rf) -> racksPerDC.put(dc, randomRacks(rf, rand)));
+ int rf = datacenters.values().stream().mapToInt(x -> x).sum();
+ String[] dcs = new String[rf];
+ int pos = 0;
+ for (Map.Entry<String, Integer> dce : datacenters.entrySet())
+ {
+ for (int i = 0; i < dce.getValue(); ++i)
+ dcs[pos++] = dce.getKey();
+ }
+
+ for (InetAddress node : nodes)
+ {
+ String dc = dcs[rand.nextInt(rf)];
+ List<String> racks = racksPerDC.get(dc);
+ String rack = racks.get(rand.nextInt(racks.size()));
+ nodeToRack.put(node, rack);
+ nodeToDC.put(node, dc);
+ }
+
+ return new AbstractNetworkTopologySnitch()
+ {
+ public String getRack(InetAddress endpoint)
+ {
+ return nodeToRack.get(endpoint);
+ }
+
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return nodeToDC.get(endpoint);
+ }
+ };
+ }
+
+ private List<String> randomRacks(int rf, Random rand)
+ {
+ int rc = rand.nextInt(rf * 3 - 1) + 1;
+ List<String> racks = new ArrayList<>(rc);
+ for (int i=0; i<rc; ++i)
+ racks.add(Integer.toString(i));
+ return racks;
+ }
+
+ // Copy of older endpoints calculation algorithm for comparison
+ public static List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata, Map<String, Integer> datacenters, IEndpointSnitch snitch)
+ {
+ // we want to preserve insertion order so that the first added endpoint becomes primary
+ Set<InetAddress> replicas = new LinkedHashSet<>();
+ // replicas we have found in each DC
+ Map<String, Set<InetAddress>> dcReplicas = new HashMap<>(datacenters.size());
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ dcReplicas.put(dc.getKey(), new HashSet<InetAddress>(dc.getValue()));
+
+ Topology topology = tokenMetadata.getTopology();
+ // all endpoints in each DC, so we can check when we have exhausted all the members of a DC
+ Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ // all racks in a DC so we can check when we have exhausted all racks in a DC
- Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
++ Map<String, ImmutableMultimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ assert !allEndpoints.isEmpty() && !racks.isEmpty() : "not aware of any cluster members";
+
+ // tracks the racks we have already placed replicas in
+ Map<String, Set<String>> seenRacks = new HashMap<>(datacenters.size());
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ seenRacks.put(dc.getKey(), new HashSet<String>());
+
+ // tracks the endpoints that we skipped over while looking for unique racks
+ // when we relax the rack uniqueness we can append this to the current result so we don't have to wind back the iterator
+ Map<String, Set<InetAddress>> skippedDcEndpoints = new HashMap<>(datacenters.size());
+ for (Map.Entry<String, Integer> dc : datacenters.entrySet())
+ skippedDcEndpoints.put(dc.getKey(), new LinkedHashSet<InetAddress>());
+
+ Iterator<Token> tokenIter = TokenMetadata.ringIterator(tokenMetadata.sortedTokens(), searchToken, false);
+ while (tokenIter.hasNext() && !hasSufficientReplicas(dcReplicas, allEndpoints, datacenters))
+ {
+ Token next = tokenIter.next();
+ InetAddress ep = tokenMetadata.getEndpoint(next);
+ String dc = snitch.getDatacenter(ep);
+ // have we already found all replicas for this dc?
+ if (!datacenters.containsKey(dc) || hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+ continue;
+ // can we skip checking the rack?
+ if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+ {
+ dcReplicas.get(dc).add(ep);
+ replicas.add(ep);
+ }
+ else
+ {
+ String rack = snitch.getRack(ep);
+ // is this a new rack?
+ if (seenRacks.get(dc).contains(rack))
+ {
+ skippedDcEndpoints.get(dc).add(ep);
+ }
+ else
+ {
+ dcReplicas.get(dc).add(ep);
+ replicas.add(ep);
+ seenRacks.get(dc).add(rack);
+ // if we've run out of distinct racks, add the hosts we skipped past already (up to RF)
+ if (seenRacks.get(dc).size() == racks.get(dc).keySet().size())
+ {
+ Iterator<InetAddress> skippedIt = skippedDcEndpoints.get(dc).iterator();
+ while (skippedIt.hasNext() && !hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+ {
+ InetAddress nextSkipped = skippedIt.next();
+ dcReplicas.get(dc).add(nextSkipped);
+ replicas.add(nextSkipped);
+ }
+ }
+ }
+ }
+ }
+
+ return new ArrayList<InetAddress>(replicas);
+ }
+
+ private static boolean hasSufficientReplicas(String dc, Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
+ {
+ return dcReplicas.get(dc).size() >= Math.min(allEndpoints.get(dc).size(), getReplicationFactor(dc, datacenters));
+ }
+
+ private static boolean hasSufficientReplicas(Map<String, Set<InetAddress>> dcReplicas, Multimap<String, InetAddress> allEndpoints, Map<String, Integer> datacenters)
+ {
+ for (String dc : datacenters.keySet())
+ if (!hasSufficientReplicas(dc, dcReplicas, allEndpoints, datacenters))
+ return false;
+ return true;
+ }
+
+ public static int getReplicationFactor(String dc, Map<String, Integer> datacenters)
+ {
+ Integer replicas = datacenters.get(dc);
+ return replicas == null ? 0 : replicas;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org