You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2012/07/10 00:07:08 UTC
git commit: update TokenMetadata in support of many tokens per node
Updated Branches:
refs/heads/trunk c22dd0821 -> e85afdc5b
update TokenMetadata in support of many tokens per node
Patch by Sam Overton; reviewed by jbellis for CASSANDRA-4121
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e85afdc5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e85afdc5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e85afdc5
Branch: refs/heads/trunk
Commit: e85afdc5b6a691591834bd32f766087560c60a39
Parents: c22dd08
Author: Eric Evans <ee...@apache.org>
Authored: Mon Jul 9 16:07:34 2012 -0600
Committer: Eric Evans <ee...@apache.org>
Committed: Mon Jul 9 16:07:34 2012 -0600
----------------------------------------------------------------------
.../apache/cassandra/locator/SimpleStrategy.java | 4 +-
.../apache/cassandra/locator/TokenMetadata.java | 157 +++++++++------
.../apache/cassandra/service/StorageService.java | 111 ++++++-----
.../locator/NetworkTopologyStrategyTest.java | 7 +-
4 files changed, 160 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/locator/SimpleStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/SimpleStrategy.java b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
index 11c2aa8..50d470f 100644
--- a/src/java/org/apache/cassandra/locator/SimpleStrategy.java
+++ b/src/java/org/apache/cassandra/locator/SimpleStrategy.java
@@ -54,7 +54,9 @@ public class SimpleStrategy extends AbstractReplicationStrategy
Iterator<Token> iter = TokenMetadata.ringIterator(tokens, token, false);
while (endpoints.size() < replicas && iter.hasNext())
{
- endpoints.add(metadata.getEndpoint(iter.next()));
+ InetAddress ep = metadata.getEndpoint(iter.next());
+ if (!endpoints.contains(ep))
+ endpoints.add(ep);
}
return endpoints;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 3340b2b..8fb63d5 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -18,6 +18,7 @@
package org.apache.cassandra.locator;
import java.net.InetAddress;
+import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -27,7 +28,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.*;
+import org.apache.cassandra.utils.BiMultiValMap;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.SortedBiMultiValMap;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +46,7 @@ public class TokenMetadata
private static final Logger logger = LoggerFactory.getLogger(TokenMetadata.class);
/* Maintains token to endpoint map of every node in the cluster. */
- private final BiMap<Token, InetAddress> tokenToEndpointMap;
+ private final BiMultiValMap<Token, InetAddress> tokenToEndpointMap;
/* Maintains endpoint to host ID map of every node in the cluster */
private final BiMap<InetAddress, UUID> endpointToHostIdMap;
@@ -72,7 +75,7 @@ public class TokenMetadata
// Finally, note that recording the tokens of joining nodes in bootstrapTokens also
// means we can detect and reject the addition of multiple nodes at the same token
// before one becomes part of the ring.
- private final BiMap<Token, InetAddress> bootstrapTokens = HashBiMap.<Token, InetAddress>create();
+ private final BiMultiValMap<Token, InetAddress> bootstrapTokens = new BiMultiValMap<Token, InetAddress>();
// (don't need to record Token here since it's still part of tokenToEndpointMap until it's done leaving)
private final Set<InetAddress> leavingEndpoints = new HashSet<InetAddress>();
// this is a cache of the calculation from {tokenToEndpointMap, bootstrapTokens, leavingEndpoints}
@@ -90,12 +93,21 @@ public class TokenMetadata
/* list of subscribers that are notified when the tokenToEndpointMap changed */
private final CopyOnWriteArrayList<AbstractReplicationStrategy> subscribers = new CopyOnWriteArrayList<AbstractReplicationStrategy>();
+ private static final Comparator<InetAddress> inetaddressCmp = new Comparator<InetAddress>()
+ {
+ @Override
+ public int compare(InetAddress o1, InetAddress o2)
+ {
+ return ByteBuffer.wrap(o1.getAddress()).compareTo(ByteBuffer.wrap(o2.getAddress()));
+ }
+ };
+
public TokenMetadata()
{
- this(HashBiMap.<Token, InetAddress>create(), new Topology());
+ this(SortedBiMultiValMap.<Token, InetAddress>create(null, inetaddressCmp), new Topology());
}
- public TokenMetadata(BiMap<Token, InetAddress> tokenToEndpointMap, Topology topology)
+ public TokenMetadata(BiMultiValMap<Token, InetAddress> tokenToEndpointMap, Topology topology)
{
this.tokenToEndpointMap = tokenToEndpointMap;
this.topology = topology;
@@ -105,22 +117,21 @@ public class TokenMetadata
private ArrayList<Token> sortTokens()
{
- ArrayList<Token> tokens = new ArrayList<Token>(tokenToEndpointMap.keySet());
- Collections.sort(tokens);
- return tokens;
+ return new ArrayList<Token>(tokenToEndpointMap.keySet());
}
/** @return the number of nodes bootstrapping into source's primary range */
public int pendingRangeChanges(InetAddress source)
{
int n = 0;
- Range<Token> sourceRange = getPrimaryRangeFor(getToken(source));
+ Collection<Range<Token>> sourceRanges = getPrimaryRangesFor(getTokens(source));
lock.readLock().lock();
try
{
for (Token token : bootstrapTokens.keySet())
- if (sourceRange.contains(token))
- n++;
+ for (Range<Token> range : sourceRanges)
+ if (range.contains(token))
+ n++;
}
finally
{
@@ -134,7 +145,15 @@ public class TokenMetadata
*/
public void updateNormalToken(Token token, InetAddress endpoint)
{
- updateNormalTokens(Collections.singleton(Pair.create(token, endpoint)));
+ updateNormalTokens(Collections.singleton(token), endpoint);
+ }
+
+ public void updateNormalTokens(Collection<Token> tokens, InetAddress endpoint)
+ {
+ Multimap<InetAddress, Token> endpointTokens = HashMultimap.create();
+ for (Token token : tokens)
+ endpointTokens.put(endpoint, token);
+ updateNormalTokens(endpointTokens);
}
/**
@@ -143,40 +162,39 @@ public class TokenMetadata
* Prefer this whenever there are multiple pairs to update, as each update (whether a single or multiple)
* is expensive (CASSANDRA-3831).
*
- * @param tokenPairs
+ * @param endpointTokens
*/
- public void updateNormalTokens(Set<Pair<Token, InetAddress>> tokenPairs)
+ public void updateNormalTokens(Multimap<InetAddress, Token> endpointTokens)
{
- if (tokenPairs.isEmpty())
+ if (endpointTokens.isEmpty())
return;
lock.writeLock().lock();
try
{
boolean shouldSortTokens = false;
- for (Pair<Token, InetAddress> tokenEndpointPair : tokenPairs)
+ for (InetAddress endpoint : endpointTokens.keySet())
{
- Token token = tokenEndpointPair.left;
- InetAddress endpoint = tokenEndpointPair.right;
+ Collection<Token> tokens = endpointTokens.get(endpoint);
- assert token != null;
- assert endpoint != null;
+ assert tokens != null && !tokens.isEmpty();
- bootstrapTokens.inverse().remove(endpoint);
- tokenToEndpointMap.inverse().remove(endpoint);
- InetAddress prev = tokenToEndpointMap.put(token, endpoint);
- if (!endpoint.equals(prev))
+ bootstrapTokens.removeValue(endpoint);
+ tokenToEndpointMap.removeValue(endpoint);
+ topology.addEndpoint(endpoint);
+ leavingEndpoints.remove(endpoint);
+ removeFromMoving(endpoint); // also removing this endpoint from moving
+
+ for (Token token : tokens)
{
- if (prev != null)
+ InetAddress prev = tokenToEndpointMap.put(token, endpoint);
+ if (!endpoint.equals(prev))
{
- logger.warn("Token " + token + " changing ownership from " + prev + " to " + endpoint);
- topology.removeEndpoint(prev);
+ if (prev != null)
+ logger.warn("Token " + token + " changing ownership from " + prev + " to " + endpoint);
+ shouldSortTokens = true;
}
- shouldSortTokens = true;
}
- topology.addEndpoint(endpoint);
- leavingEndpoints.remove(endpoint);
- removeFromMoving(endpoint); // also removing this endpoint from moving
}
if (shouldSortTokens)
@@ -239,26 +257,38 @@ public class TokenMetadata
return readMap;
}
+ @Deprecated
public void addBootstrapToken(Token token, InetAddress endpoint)
{
- assert token != null;
+ addBootstrapTokens(Collections.singleton(token), endpoint);
+ }
+
+ public void addBootstrapTokens(Collection<Token> tokens, InetAddress endpoint)
+ {
+ assert tokens != null && !tokens.isEmpty();
assert endpoint != null;
lock.writeLock().lock();
try
{
+
InetAddress oldEndpoint;
- oldEndpoint = bootstrapTokens.get(token);
- if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
- throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
+ for (Token token : tokens)
+ {
+ oldEndpoint = bootstrapTokens.get(token);
+ if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+ throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
+
+ oldEndpoint = tokenToEndpointMap.get(token);
+ if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
+ throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
+ }
- oldEndpoint = tokenToEndpointMap.get(token);
- if (oldEndpoint != null && !oldEndpoint.equals(endpoint))
- throw new RuntimeException("Bootstrap Token collision between " + oldEndpoint + " and " + endpoint + " (token " + token);
+ bootstrapTokens.removeValue(endpoint);
- bootstrapTokens.inverse().remove(endpoint);
- bootstrapTokens.put(token, endpoint);
+ for (Token token : tokens)
+ bootstrapTokens.put(token, endpoint);
}
finally
{
@@ -324,8 +354,8 @@ public class TokenMetadata
lock.writeLock().lock();
try
{
- bootstrapTokens.inverse().remove(endpoint);
- tokenToEndpointMap.inverse().remove(endpoint);
+ bootstrapTokens.removeValue(endpoint);
+ tokenToEndpointMap.removeValue(endpoint);
topology.removeEndpoint(endpoint);
leavingEndpoints.remove(endpoint);
endpointToHostIdMap.remove(endpoint);
@@ -366,7 +396,7 @@ public class TokenMetadata
}
}
- public Token getToken(InetAddress endpoint)
+ public Collection<Token> getTokens(InetAddress endpoint)
{
assert endpoint != null;
assert isMember(endpoint); // don't want to return nulls
@@ -374,7 +404,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- return tokenToEndpointMap.inverse().get(endpoint);
+ return new ArrayList<Token>(tokenToEndpointMap.inverse().get(endpoint));
}
finally
{
@@ -382,6 +412,12 @@ public class TokenMetadata
}
}
+ @Deprecated
+ public Token getToken(InetAddress endpoint)
+ {
+ return getTokens(endpoint).iterator().next();
+ }
+
public boolean isMember(InetAddress endpoint)
{
assert endpoint != null;
@@ -443,7 +479,7 @@ public class TokenMetadata
lock.readLock().lock();
try
{
- return new TokenMetadata(HashBiMap.create(tokenToEndpointMap), new Topology(topology));
+ return new TokenMetadata(SortedBiMultiValMap.<Token, InetAddress>create(tokenToEndpointMap, null, inetaddressCmp), new Topology(topology));
}
finally
{
@@ -517,9 +553,18 @@ public class TokenMetadata
}
}
+ public Collection<Range<Token>> getPrimaryRangesFor(Collection<Token> tokens)
+ {
+ Collection<Range<Token>> ranges = new ArrayList<Range<Token>>(tokens.size());
+ for (Token right : tokens)
+ ranges.add(new Range<Token>(getPredecessor(right), right));
+ return ranges;
+ }
+
+ @Deprecated
public Range<Token> getPrimaryRangeFor(Token right)
{
- return new Range<Token>(getPredecessor(right), right);
+ return getPrimaryRangesFor(Arrays.asList(right)).iterator().next();
}
public ArrayList<Token> sortedTokens()
@@ -581,12 +626,12 @@ public class TokenMetadata
}
/** @return a copy of the bootstrapping tokens map */
- public Map<Token, InetAddress> getBootstrapTokens()
+ public BiMultiValMap<Token, InetAddress> getBootstrapTokens()
{
lock.readLock().lock();
try
{
- return ImmutableMap.copyOf(bootstrapTokens);
+ return new BiMultiValMap<Token, InetAddress>(bootstrapTokens);
}
finally
{
@@ -803,24 +848,6 @@ public class TokenMetadata
}
/**
- * @return a token to endpoint map to consider for read operations on the cluster.
- */
- public Map<Token, InetAddress> getTokenToEndpointMapForReading()
- {
- lock.readLock().lock();
- try
- {
- Map<Token, InetAddress> map = new HashMap<Token, InetAddress>(tokenToEndpointMap.size());
- map.putAll(tokenToEndpointMap);
- return map;
- }
- finally
- {
- lock.readLock().unlock();
- }
- }
-
- /**
* @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
* in the cluster.
*/
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index ce14352..583b3a7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -30,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import com.google.common.base.Function;
import com.google.common.collect.*;
import org.apache.log4j.Level;
import org.apache.commons.lang.StringUtils;
@@ -56,6 +55,7 @@ import org.apache.cassandra.net.IAsyncResult;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.ResponseVerbHandler;
+import org.apache.cassandra.service.AntiEntropyService.RepairFuture;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.*;
@@ -120,6 +120,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
return getRangesForEndpoint(table, FBUtilities.getBroadcastAddress());
}
+ public Collection<Range<Token>> getLocalPrimaryRanges()
+ {
+ return getPrimaryRangesForEndpoint(FBUtilities.getBroadcastAddress());
+ }
+
+ @Deprecated
public Range<Token> getLocalPrimaryRange()
{
return getPrimaryRangeForEndpoint(FBUtilities.getBroadcastAddress());
@@ -1168,7 +1174,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.info("Node " + endpoint + " state jump to leaving");
tokenMetadata.updateNormalToken(token, endpoint);
}
- else if (!tokenMetadata.getToken(endpoint).equals(token))
+ else if (!tokenMetadata.getTokens(endpoint).contains(token))
{
logger.warn("Node " + endpoint + " 'leaving' token mismatch. Long network partition?");
tokenMetadata.updateNormalToken(token, endpoint);
@@ -1338,7 +1344,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
TokenMetadata tm = StorageService.instance.getTokenMetadata();
Multimap<Range<Token>, InetAddress> pendingRanges = HashMultimap.create();
- Map<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
+ BiMultiValMap<Token, InetAddress> bootstrapTokens = tm.getBootstrapTokens();
Set<InetAddress> leavingEndpoints = tm.getLeavingEndpoints();
if (bootstrapTokens.isEmpty() && leavingEndpoints.isEmpty() && tm.getMovingEndpoints().isEmpty())
@@ -1373,11 +1379,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// For each of the bootstrapping nodes, simply add and remove them one by one to
// allLeftMetadata and check in between what their ranges would be.
- for (Map.Entry<Token, InetAddress> entry : bootstrapTokens.entrySet())
+ for (InetAddress endpoint : bootstrapTokens.inverse().keySet())
{
- InetAddress endpoint = entry.getValue();
-
- allLeftMetadata.updateNormalToken(entry.getKey(), endpoint);
+ Collection<Token> tokens = bootstrapTokens.inverse().get(endpoint);
+
+ allLeftMetadata.updateNormalTokens(tokens, endpoint);
for (Range<Token> range : strategy.getAddressRanges(allLeftMetadata).get(endpoint))
pendingRanges.put(range, endpoint);
allLeftMetadata.removeEndpoint(endpoint);
@@ -1977,18 +1983,17 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (Table.SYSTEM_TABLE.equals(tableName))
return;
- AntiEntropyService.RepairFuture future = forceTableRepair(getLocalPrimaryRange(), tableName, isSequential, columnFamilies);
- if (future == null)
- return;
- try
- {
- future.get();
- }
- catch (Exception e)
+ List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>();
+ for (Range<Token> range : getLocalPrimaryRanges())
{
- logger.error("Repair session " + future.session.getName() + " failed.", e);
- throw new IOException("Some repair session(s) failed (see log for details).");
+ RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies);
+ if (future != null)
+ futures.add(future);
}
+ if (futures.isEmpty())
+ return;
+ for (AntiEntropyService.RepairFuture future : futures)
+ FBUtilities.waitOnFuture(future);
}
public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
@@ -2041,9 +2046,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
* This method returns the predecessor of the endpoint ep on the identifier
* space.
*/
- InetAddress getPredecessor(InetAddress ep)
+ InetAddress getPredecessor(Token token)
{
- Token token = tokenMetadata.getToken(ep);
return tokenMetadata.getEndpoint(tokenMetadata.getPredecessor(token));
}
@@ -2051,17 +2055,27 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
* This method returns the successor of the endpoint ep on the identifier
* space.
*/
- public InetAddress getSuccessor(InetAddress ep)
+ public InetAddress getSuccessor(Token token)
{
- Token token = tokenMetadata.getToken(ep);
return tokenMetadata.getEndpoint(tokenMetadata.getSuccessor(token));
}
/**
+ * Get the primary ranges for the specified endpoint.
+ * @param ep endpoint we are interested in.
+ * @return collection of ranges for the specified endpoint.
+ */
+ public Collection<Range<Token>> getPrimaryRangesForEndpoint(InetAddress ep)
+ {
+ return tokenMetadata.getPrimaryRangesFor(tokenMetadata.getTokens(ep));
+ }
+
+ /**
* Get the primary range for the specified endpoint.
* @param ep endpoint we are interested in.
* @return range for the specified endpoint.
*/
+ @Deprecated
public Range<Token> getPrimaryRangeForEndpoint(InetAddress ep)
{
return tokenMetadata.getPrimaryRangeFor(tokenMetadata.getToken(ep));
@@ -2741,14 +2755,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public Map<InetAddress, Float> getOwnership()
{
- Map<Token, InetAddress> tokensToEndpoints = tokenMetadata.getTokenToEndpointMapForReading();
- List<Token> sortedTokens = new ArrayList<Token>(tokensToEndpoints.keySet());
- Collections.sort(sortedTokens);
+ List<Token> sortedTokens = tokenMetadata.sortedTokens();
// describeOwnership returns tokens in an unspecified order, let's re-order them
Map<Token, Float> tokenMap = new TreeMap<Token, Float>(getPartitioner().describeOwnership(sortedTokens));
Map<InetAddress, Float> stringMap = new LinkedHashMap<InetAddress, Float>();
for (Map.Entry<Token, Float> entry : tokenMap.entrySet())
- stringMap.put(tokensToEndpoints.get(entry.getKey()), entry.getValue());
+ stringMap.put(tokenMetadata.getEndpoint(entry.getKey()), entry.getValue());
return stringMap;
}
@@ -2773,22 +2785,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (keyspace == null)
keyspace = Schema.instance.getNonSystemTables().get(0);
- final BiMap<InetAddress, Token> endpointsToTokens = ImmutableBiMap.copyOf(metadata.getTokenToEndpointMapForReading()).inverse();
-
Collection<Collection<InetAddress>> endpointsGroupedByDc = new ArrayList<Collection<InetAddress>>();
- if (isDcAwareReplicationStrategy(keyspace))
- {
- // mapping of dc's to nodes, use sorted map so that we get dcs sorted
- SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>();
- sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap());
- for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values())
- endpointsGroupedByDc.add(endpoints);
- }
- else
- {
- endpointsGroupedByDc.add(endpointsToTokens.keySet());
- }
+ // mapping of dc's to nodes, use sorted map so that we get dcs sorted
+ SortedMap<String, Collection<InetAddress>> sortedDcsToEndpoints = new TreeMap<String, Collection<InetAddress>>();
+ sortedDcsToEndpoints.putAll(metadata.getTopology().getDatacenterEndpoints().asMap());
+ for (Collection<InetAddress> endpoints : sortedDcsToEndpoints.values())
+ endpointsGroupedByDc.add(endpoints);
+ Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(tokenMetadata.sortedTokens());
LinkedHashMap<InetAddress, Float> finalOwnership = Maps.newLinkedHashMap();
// calculate ownership per dc
@@ -2802,19 +2806,22 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
public int compare(InetAddress o1, InetAddress o2)
{
- return endpointsToTokens.get(o1).compareTo(endpointsToTokens.get(o2));
- }
- });
+ byte[] b1 = o1.getAddress();
+ byte[] b2 = o2.getAddress();
- // calculate the ownership without replication
- Function<InetAddress, Token> f = new Function<InetAddress, Token>()
- {
- public Token apply(InetAddress arg0)
- {
- return endpointsToTokens.get(arg0);
+ if(b1.length < b2.length) return -1;
+ if(b1.length > b2.length) return 1;
+
+ for(int i = 0; i < b1.length; i++)
+ {
+ int left = (int)b1[i] & 0xFF;
+ int right = (int)b2[i] & 0xFF;
+ if (left < right) return -1;
+ else if (left > right) return 1;
+ }
+ return 0;
}
- };
- Map<Token, Float> tokenOwnership = getPartitioner().describeOwnership(Lists.transform(sortedEndpoints, f));
+ });
// calculate the ownership with replication and add the endpoint to the final ownership map
for (InetAddress endpoint : endpoints)
@@ -3144,7 +3151,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
*/
public List<String> getRangeKeySample()
{
- List<DecoratedKey> keys = keySamples(ColumnFamilyStore.allUserDefined(), getLocalPrimaryRange());
+ List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ for (Range<Token> range : getLocalPrimaryRanges())
+ keys.addAll(keySamples(ColumnFamilyStore.allUserDefined(), range));
List<String> sampledKeys = new ArrayList<String>(keys.size());
for (DecoratedKey key : keys)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e85afdc5/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
index 9e3d684..bd03766 100644
--- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java
@@ -41,6 +41,9 @@ import org.apache.cassandra.dht.StringToken;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.utils.Pair;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
public class NetworkTopologyStrategyTest
{
private String table = "Keyspace1";
@@ -105,7 +108,7 @@ public class NetworkTopologyStrategyTest
DatabaseDescriptor.setEndpointSnitch(snitch);
TokenMetadata metadata = new TokenMetadata();
Map<String, String> configOptions = new HashMap<String, String>();
- Set<Pair<Token, InetAddress>> tokens = new HashSet<Pair<Token, InetAddress>>();
+ Multimap<InetAddress, Token> tokens = HashMultimap.create();
int totalRF = 0;
for (int dc = 0; dc < dcRacks.length; ++dc)
@@ -120,7 +123,7 @@ public class NetworkTopologyStrategyTest
InetAddress address = InetAddress.getByAddress(ipBytes);
StringToken token = new StringToken(String.format("%02x%02x%02x", ep, rack, dc));
logger.debug("adding node " + address + " at " + token);
- tokens.add(new Pair<Token, InetAddress>(token, address));
+ tokens.put(address, token);
}
}
}