You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/09/16 12:40:59 UTC
cassandra git commit: Fix consolidating racks violating the RF
contract
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.0 0bb32f0b6 -> 257cdaa08
Fix consolidating racks violating the RF contract
patch by Stefania Alborghetti; reviewed by Blake Eggleston for
CASSANDRA-10238
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/257cdaa0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/257cdaa0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/257cdaa0
Branch: refs/heads/cassandra-2.0
Commit: 257cdaa08dc12f747a25c03b9b0ad3ffc76ace9b
Parents: 0bb32f0
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Fri Sep 11 16:31:40 2015 +0800
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Sep 16 11:40:31 2015 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/locator/PropertyFileSnitch.java | 1 +
.../apache/cassandra/locator/TokenMetadata.java | 142 +++++++++----
.../locator/YamlFileNetworkTopologySnitch.java | 1 +
.../cassandra/service/StorageService.java | 16 ++
.../cassandra/locator/TokenMetadataTest.java | 209 ++++++++++++++++++-
6 files changed, 332 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d4cc15f..3c47427 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.17
+ * Fix consolidating racks violating the RF contract (CASSANDRA-10238)
* Disallow decommission when node is in drained state (CASSANDRA-8741)
* Backport CASSANDRA-8013 to 2.0 (CASSANDRA-10144)
* Make getFullyExpiredSSTables less expensive (CASSANDRA-9882)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
index 4f822c6..745eeb8 100644
--- a/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
+++ b/src/java/org/apache/cassandra/locator/PropertyFileSnitch.java
@@ -69,6 +69,7 @@ public class PropertyFileSnitch extends AbstractNetworkTopologySnitch
protected void runMayThrow() throws ConfigurationException
{
reloadConfiguration();
+ StorageService.instance.updateTopology();
}
};
ResourceWatcher.watch(SNITCH_PROPERTIES_FILENAME, runnable, 60 * 1000);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index a673c94..b1b25e8 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -78,14 +78,14 @@ public class TokenMetadata
// Finally, note that recording the tokens of joining nodes in bootstrapTokens also
// means we can detect and reject the addition of multiple nodes at the same token
// before one becomes part of the ring.
- private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token, InetAddress>();
+ private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<>();
// (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
- private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
+ private final Set<InetAddress> leavingEndpoints = new HashSet<>();
// this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
- private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<String, Multimap<Range<Token>, InetAddress>>();
+ private final ConcurrentMap<String, Multimap<Range<Token>, InetAddress>> pendingRanges = new ConcurrentHashMap<>();
// nodes which are migrating to the new tokens in the ring
- private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<Pair<Token, InetAddress>>();
+ private final Set<Pair<Token, InetAddress>> movingEndpoints = new HashSet<>();
/* Use this lock for manipulating the token map */
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@@ -121,7 +121,7 @@ public class TokenMetadata
private ArrayList<Token> sortTokens()
{
- return new ArrayList<Token>(tokenToEndpointMap.keySet());
+ return new ArrayList<>(tokenToEndpointMap.keySet());
}
/** @return the number of nodes bootstrapping into source's primary range */
@@ -165,8 +165,6 @@ public class TokenMetadata
*
* Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple)
* is expensive (CASSANDRA-3831).
- *
- * @param endpointTokens
*/
public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
{
@@ -213,9 +211,6 @@ public class TokenMetadata
/**
* Store an end-point to host ID mapping. Each ID must be unique, and
* cannot be changed after the fact.
- *
- * @param hostId
- * @param endpoint
*/
public void updateHostId(UUID hostId, InetAddress endpoint)
{
@@ -284,7 +279,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- Map<InetAddress, UUID> readMap = new HashMap<InetAddress, UUID>();
+ Map<InetAddress, UUID> readMap = new HashMap<>();
readMap.putAll(endpointToHostIdMap);
return readMap;
}
@@ -407,6 +402,43 @@ public class TokenMetadata
}
/**
+ * This is called when the snitch properties for this endpoint are updated, see CASSANDRA-10238.
+ */
+ public void updateTopology(InetAddress endpoint)
+ {
+ assert endpoint != null;
+
+ lock.writeLock().lock();
+ try
+ {
+ logger.info("Updating topology for {}", endpoint);
+ topology.updateEndpoint(endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * This is called when the snitch properties for many endpoints are updated, it will update
+ * the topology mappings of any endpoints whose snitch has changed, see CASSANDRA-10238.
+ */
+ public void updateTopology()
+ {
+ lock.writeLock().lock();
+ try
+ {
+ logger.info("Updating topology for all endpoints that have changed");
+ topology.updateEndpoints();
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
* Remove pair of token/address from moving endpoints
* @param endpoint address of the moving node
*/
@@ -442,7 +474,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint));
+ return new ArrayList<>(tokenToEndpointMap.inverse().get(endpoint));
}
finally
{
@@ -508,7 +540,7 @@ public class TokenMetadata
}
}
- private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<TokenMetadata>();
+ private final AtomicReference<TokenMetadata> cachedTokenMap = new AtomicReference<>();
/**
* Create a copy of TokenMetadata with only tokenToEndpointMap. That is, pending ranges,
@@ -519,7 +551,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp),
+ return new TokenMetadata(SortedBiMultiValMap.create(tokenToEndpointMap, null, inetaddressCmp),
HashBiMap.create(endpointToHostIdMap),
new Topology(topology));
}
@@ -622,9 +654,9 @@ public class TokenMetadata
public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens)
{
- Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(tokens.size());
+ Collection<Range<Token>> ranges = new ArrayList<>(tokens.size());
for (Token right : tokens)
- ranges.add(new Range<Token>(getPredecessor(right), right));
+ ranges.add(new Range<>(getPredecessor(right), right));
return ranges;
}
@@ -660,7 +692,7 @@ public class TokenMetadata
public List<Range<Token>> getPendingRanges(String keyspaceName, InetAddress endpoint)
{
- List<Range<Token>> ranges = new ArrayList<Range<Token>>();
+ List<Range<Token>> ranges = new ArrayList<>();
for (Map.Entry<Range<Token>, InetAddress> entry : getPendingRangesMM(keyspaceName).entries())
{
if (entry.getValue().equals(endpoint))
@@ -845,7 +877,7 @@ public class TokenMetadata
for (InetAddress ep : eps)
{
sb.append(ep);
- sb.append(":");
+ sb.append(':');
sb.append(tokenToEndpointMap.inverse().get(ep));
sb.append(System.getProperty("line.separator"));
}
@@ -857,7 +889,7 @@ public class TokenMetadata
sb.append(System.getProperty("line.separator"));
for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
{
- sb.append(entry.getValue()).append(":").append(entry.getKey());
+ sb.append(entry.getValue()).append(':').append(entry.getKey());
sb.append(System.getProperty("line.separator"));
}
}
@@ -896,7 +928,7 @@ public class TokenMetadata
{
for (Map.Entry<Range<Token>, InetAddress> rmap : entry.getValue().entries())
{
- sb.append(rmap.getValue()).append(":").append(rmap.getKey());
+ sb.append(rmap.getValue()).append(':').append(rmap.getKey());
sb.append(System.getProperty("line.separator"));
}
}
@@ -910,7 +942,7 @@ public class TokenMetadata
if (ranges.isEmpty())
return Collections.emptyList();
- Set<InetAddress> endpoints = new HashSet<InetAddress>();
+ Set<InetAddress> endpoints = new HashSet<>();
for (Map.Entry<Range<Token>, Collection<InetAddress>> entry : ranges.entrySet())
{
if (entry.getKey().contains(token))
@@ -954,7 +986,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size() + bootstrapTokens.size());
+ Map<Token, InetAddress> map = new HashMap<>(tokenToEndpointMap.size() + bootstrapTokens.size());
map.putAll(tokenToEndpointMap);
map.putAll(bootstrapTokens);
return map;
@@ -1001,14 +1033,14 @@ public class TokenMetadata
/** reverse-lookup map for endpoint to current known dc/rack assignment */
private final Map<InetAddress, Pair<String, String>> currentLocations;
- protected Topology()
+ Topology()
{
dcEndpoints = HashMultimap.create();
- dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
- currentLocations = new HashMap<InetAddress, Pair<String, String>>();
+ dcRacks = new HashMap<>();
+ currentLocations = new HashMap<>();
}
- protected void clear()
+ void clear()
{
dcEndpoints.clear();
dcRacks.clear();
@@ -1018,19 +1050,19 @@ public class TokenMetadata
/**
* construct deep-copy of other
*/
- protected Topology(Topology other)
+ Topology(Topology other)
{
dcEndpoints = HashMultimap.create(other.dcEndpoints);
- dcRacks = new HashMap<String, Multimap<String, InetAddress>>();
+ dcRacks = new HashMap<>();
for (String dc : other.dcRacks.keySet())
dcRacks.put(dc, HashMultimap.create(other.dcRacks.get(dc)));
- currentLocations = new HashMap<InetAddress, Pair<String, String>>(other.currentLocations);
+ currentLocations = new HashMap<>(other.currentLocations);
}
/**
* Stores current DC/rack assignment for ep
*/
- protected void addEndpoint(InetAddress ep)
+ void addEndpoint(InetAddress ep)
{
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
String dc = snitch.getDatacenter(ep);
@@ -1040,10 +1072,14 @@ public class TokenMetadata
{
if (current.left.equals(dc) && current.right.equals(rack))
return;
- dcRacks.get(current.left).remove(current.right, ep);
- dcEndpoints.remove(current.left, ep);
+ doRemoveEndpoint(ep, current);
}
+ doAddEndpoint(ep, dc, rack);
+ }
+
+ private void doAddEndpoint(InetAddress ep, String dc, String rack)
+ {
dcEndpoints.put(dc, ep);
if (!dcRacks.containsKey(dc))
@@ -1056,13 +1092,49 @@ public class TokenMetadata
/**
* Removes current DC/rack assignment for ep
*/
- protected void removeEndpoint(InetAddress ep)
+ void removeEndpoint(InetAddress ep)
{
if (!currentLocations.containsKey(ep))
return;
- Pair<String, String> current = currentLocations.remove(ep);
- dcEndpoints.remove(current.left, ep);
+
+ doRemoveEndpoint(ep, currentLocations.remove(ep));
+ }
+
+ private void doRemoveEndpoint(InetAddress ep, Pair<String, String> current)
+ {
dcRacks.get(current.left).remove(current.right, ep);
+ dcEndpoints.remove(current.left, ep);
+ }
+
+ void updateEndpoint(InetAddress ep)
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ if (snitch == null || !currentLocations.containsKey(ep))
+ return;
+
+ updateEndpoint(ep, snitch);
+ }
+
+ void updateEndpoints()
+ {
+ IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
+ if (snitch == null)
+ return;
+
+ for (InetAddress ep : currentLocations.keySet())
+ updateEndpoint(ep, snitch);
+ }
+
+ private void updateEndpoint(InetAddress ep, IEndpointSnitch snitch)
+ {
+ Pair<String, String> current = currentLocations.get(ep);
+ String dc = snitch.getDatacenter(ep);
+ String rack = snitch.getRack(ep);
+ if (dc.equals(current.left) && rack.equals(current.right))
+ return;
+
+ doRemoveEndpoint(ep, current);
+ doAddEndpoint(ep, dc, rack);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
index 3237979..e6691c4 100644
--- a/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
+++ b/src/java/org/apache/cassandra/locator/YamlFileNetworkTopologySnitch.java
@@ -120,6 +120,7 @@ public class YamlFileNetworkTopologySnitch
protected void runMayThrow() throws ConfigurationException
{
loadTopologyConfiguration();
+ StorageService.instance.updateTopology();
}
};
ResourceWatcher.watch(topologyConfigFilename, runnable,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 5ac4980..c5f159e 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -1378,9 +1378,11 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
SystemKeyspace.updatePeerInfo(endpoint, "release_version", quote(value.value));
break;
case DC:
+ updateTopology(endpoint);
SystemKeyspace.updatePeerInfo(endpoint, "data_center", quote(value.value));
break;
case RACK:
+ updateTopology(endpoint);
SystemKeyspace.updatePeerInfo(endpoint, "rack", quote(value.value));
break;
case RPC_ADDRESS:
@@ -1398,6 +1400,20 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
}
+ public void updateTopology(InetAddress endpoint)
+ {
+ if (getTokenMetadata().isMember(endpoint))
+ {
+ getTokenMetadata().updateTopology(endpoint);
+ }
+ }
+
+ public void updateTopology()
+ {
+ getTokenMetadata().updateTopology();
+
+ }
+
private void updatePeerInfo(InetAddress endpoint)
{
EndpointState epState = Gossiper.instance.getEndpointStateForEndpoint(endpoint);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/257cdaa0/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
index 95118dc..fc8095d 100644
--- a/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
+++ b/test/unit/org/apache/cassandra/locator/TokenMetadataTest.java
@@ -19,19 +19,27 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Map;
+
import com.google.common.collect.Iterators;
+import com.google.common.collect.Multimap;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
+
+import static junit.framework.Assert.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.apache.cassandra.Util.token;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import org.apache.cassandra.OrderedJUnit4ClassRunner;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
@RunWith(OrderedJUnit4ClassRunner.class)
@@ -50,9 +58,9 @@ public class TokenMetadataTest
tmd.updateNormalToken(token(SIX), InetAddress.getByName("127.0.0.6"));
}
- private void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected)
+ private static void testRingIterator(ArrayList<Token> ring, String start, boolean includeMin, String... expected)
{
- ArrayList<Token> actual = new ArrayList<Token>();
+ ArrayList<Token> actual = new ArrayList<>();
Iterators.addAll(actual, TokenMetadata.ringIterator(ring, token(start), includeMin));
assertEquals(actual.toString(), expected.length, actual.size());
for (int i = 0; i < expected.length; i++)
@@ -84,4 +92,199 @@ public class TokenMetadataTest
{
testRingIterator(new ArrayList<Token>(), "2", false);
}
+
+ @Test
+ public void testTopologyUpdate_RackConsolidation() throws UnknownHostException
+ {
+ final InetAddress first = InetAddress.getByName("127.0.0.1");
+ final InetAddress second = InetAddress.getByName("127.0.0.6");
+ final String DATA_CENTER = "datacenter1";
+ final String RACK1 = "rack1";
+ final String RACK2 = "rack2";
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+ {
+ @Override
+ public String getRack(InetAddress endpoint)
+ {
+ return endpoint.equals(first) ? RACK1 : RACK2;
+ }
+
+ @Override
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return DATA_CENTER;
+ }
+
+ @Override
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ return 0;
+ }
+ });
+
+ tmd.updateNormalToken(token(ONE), first);
+ tmd.updateNormalToken(token(SIX), second);
+
+ TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap();
+ assertNotNull(tokenMetadata);
+
+ TokenMetadata.Topology topology = tokenMetadata.getTopology();
+ assertNotNull(topology);
+
+ Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ assertNotNull(allEndpoints);
+ assertTrue(allEndpoints.size() == 2);
+ assertTrue(allEndpoints.containsKey(DATA_CENTER));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+ Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ assertNotNull(racks);
+ assertTrue(racks.size() == 1);
+ assertTrue(racks.containsKey(DATA_CENTER));
+ assertTrue(racks.get(DATA_CENTER).size() == 2);
+ assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+ assertTrue(racks.get(DATA_CENTER).containsKey(RACK2));
+ assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+ assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second));
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+ {
+ @Override
+ public String getRack(InetAddress endpoint)
+ {
+ return RACK1;
+ }
+
+ @Override
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return DATA_CENTER;
+ }
+
+ @Override
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ return 0;
+ }
+ });
+
+ tokenMetadata.updateTopology(first);
+ tokenMetadata.updateTopology(second);
+
+ allEndpoints = topology.getDatacenterEndpoints();
+ assertNotNull(allEndpoints);
+ assertTrue(allEndpoints.size() == 2);
+ assertTrue(allEndpoints.containsKey(DATA_CENTER));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+ racks = topology.getDatacenterRacks();
+ assertNotNull(racks);
+ assertTrue(racks.size() == 1);
+ assertTrue(racks.containsKey(DATA_CENTER));
+ assertTrue(racks.get(DATA_CENTER).size() == 2);
+ assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+ assertFalse(racks.get(DATA_CENTER).containsKey(RACK2));
+ assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+ assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second));
+ }
+
+ @Test
+ public void testTopologyUpdate_RackExpansion() throws UnknownHostException
+ {
+ final InetAddress first = InetAddress.getByName("127.0.0.1");
+ final InetAddress second = InetAddress.getByName("127.0.0.6");
+ final String DATA_CENTER = "datacenter1";
+ final String RACK1 = "rack1";
+ final String RACK2 = "rack2";
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+ {
+ @Override
+ public String getRack(InetAddress endpoint)
+ {
+ return RACK1;
+ }
+
+ @Override
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return DATA_CENTER;
+ }
+
+ @Override
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ return 0;
+ }
+ });
+
+ tmd.updateNormalToken(token(ONE), first);
+ tmd.updateNormalToken(token(SIX), second);
+
+ TokenMetadata tokenMetadata = tmd.cloneOnlyTokenMap();
+ assertNotNull(tokenMetadata);
+
+ TokenMetadata.Topology topology = tokenMetadata.getTopology();
+ assertNotNull(topology);
+
+ Multimap<String, InetAddress> allEndpoints = topology.getDatacenterEndpoints();
+ assertNotNull(allEndpoints);
+ assertTrue(allEndpoints.size() == 2);
+ assertTrue(allEndpoints.containsKey(DATA_CENTER));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+ Map<String, Multimap<String, InetAddress>> racks = topology.getDatacenterRacks();
+ assertNotNull(racks);
+ assertTrue(racks.size() == 1);
+ assertTrue(racks.containsKey(DATA_CENTER));
+ assertTrue(racks.get(DATA_CENTER).size() == 2);
+ assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+ assertFalse(racks.get(DATA_CENTER).containsKey(RACK2));
+ assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+ assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(second));
+
+ DatabaseDescriptor.setEndpointSnitch(new AbstractEndpointSnitch()
+ {
+ @Override
+ public String getRack(InetAddress endpoint)
+ {
+ return endpoint.equals(first) ? RACK1 : RACK2;
+ }
+
+ @Override
+ public String getDatacenter(InetAddress endpoint)
+ {
+ return DATA_CENTER;
+ }
+
+ @Override
+ public int compareEndpoints(InetAddress target, InetAddress a1, InetAddress a2)
+ {
+ return 0;
+ }
+ });
+
+ tokenMetadata.updateTopology();
+
+ allEndpoints = topology.getDatacenterEndpoints();
+ assertNotNull(allEndpoints);
+ assertTrue(allEndpoints.size() == 2);
+ assertTrue(allEndpoints.containsKey(DATA_CENTER));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(first));
+ assertTrue(allEndpoints.get(DATA_CENTER).contains(second));
+
+ racks = topology.getDatacenterRacks();
+ assertNotNull(racks);
+ assertTrue(racks.size() == 1);
+ assertTrue(racks.containsKey(DATA_CENTER));
+ assertTrue(racks.get(DATA_CENTER).size() == 2);
+ assertTrue(racks.get(DATA_CENTER).containsKey(RACK1));
+ assertTrue(racks.get(DATA_CENTER).containsKey(RACK2));
+ assertTrue(racks.get(DATA_CENTER).get(RACK1).contains(first));
+ assertTrue(racks.get(DATA_CENTER).get(RACK2).contains(second));
+ }
}