You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2012/07/18 20:49:39 UTC
[5/5] git commit: virtual nodes support for bootstrap and decommission
virtual nodes support for bootstrap and decommission
Patch by Sam Overton and eevans; reviewed by Brandon Williams for CASSANDRA-4122
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66b96ee5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66b96ee5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66b96ee5
Branch: refs/heads/trunk
Commit: 66b96ee5357b639729b6f220b1d5453027b866c6
Parents: 72aa824
Author: Eric Evans <ee...@apache.org>
Authored: Wed Jul 18 13:27:56 2012 -0500
Committer: Eric Evans <ee...@apache.org>
Committed: Wed Jul 18 13:27:56 2012 -0500
----------------------------------------------------------------------
NEWS.txt | 3 +
conf/cassandra.yaml | 17 +-
src/java/org/apache/cassandra/config/Config.java | 1 +
.../cassandra/config/DatabaseDescriptor.java | 25 +-
src/java/org/apache/cassandra/db/SystemTable.java | 102 +++++--
.../org/apache/cassandra/dht/BootStrapper.java | 60 +++-
src/java/org/apache/cassandra/gms/Gossiper.java | 10 +-
.../org/apache/cassandra/gms/VersionedValue.java | 38 ++-
.../locator/AbstractReplicationStrategy.java | 7 +-
.../apache/cassandra/locator/TokenMetadata.java | 24 ++-
.../apache/cassandra/service/StorageService.java | 276 +++++++++------
test/unit/org/apache/cassandra/Util.java | 5 +-
.../org/apache/cassandra/db/SystemTableTest.java | 32 ++-
.../org/apache/cassandra/dht/BootStrapperTest.java | 7 +-
.../apache/cassandra/gms/SerializationsTest.java | 5 +-
.../service/AntiEntropyServiceTestAbstract.java | 2 +-
.../cassandra/service/LeaveAndBootstrapTest.java | 70 +++--
.../org/apache/cassandra/service/MoveTest.java | 20 +-
18 files changed, 492 insertions(+), 212 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index fd13667..c52bd89 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -45,6 +45,9 @@ Features
as well as other updates
- rpc_timeout has been split up to allow finer-grained control
on timeouts for different operation types
+ - num_tokens can now be specified in cassandra.yaml. This defines the
+ number of tokens assigned to the host on the ring (default: 1).
+ Also specifying initial_token will override any num_tokens setting.
1.1.2
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e4a46b8..1b89b2e 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -9,7 +9,22 @@
# one logical cluster from joining another.
cluster_name: 'Test Cluster'
-# You should always specify InitialToken when setting up a production
+# This defines the number of tokens randomly assigned to this node on the ring
+# The more tokens, relative to other nodes, the larger the proportion of data
+# that this node will store. You probably want all nodes to have the same number
+# of tokens assuming they have equal hardware capability.
+#
+# If you leave this unspecified, Cassandra will use the default of 1 token for legacy compatibility,
+# and will use the initial_token as described below.
+#
+# Specifying initial_token will override this setting.
+#
+# If you already have a cluster with 1 token per node, and wish to migrate to
+# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
+# num_tokens: 256
+
+# If you haven't specified num_tokens, or have set it to the default of 1 then
+# you should always specify InitialToken when setting up a production
# cluster for the first time, and often when adding capacity later.
# The principle is that each node should be given an equal slice of
# the token ring; see http://wiki.apache.org/cassandra/Operations
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f5f4ef6..8da9777 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -38,6 +38,7 @@ public class Config
/* initial token in the ring */
public String initial_token;
+ public Integer num_tokens = 1;
public Long rpc_timeout_in_ms = new Long(10000);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 11d8417..878c6f1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -437,7 +437,8 @@ public class DatabaseDescriptor
}
if (conf.initial_token != null)
- partitioner.getTokenFactory().validate(conf.initial_token);
+ for (String token : tokensFromString(conf.initial_token))
+ partitioner.getTokenFactory().validate(token);
try
{
@@ -667,14 +668,28 @@ public class DatabaseDescriptor
return conf.column_index_size_in_kb * 1024;
}
- public static String getInitialToken()
+ public static Collection<String> getInitialTokens()
{
- return System.getProperty("cassandra.initial_token", conf.initial_token);
+ return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));
}
- public static String getReplaceToken()
+ public static Collection<String> tokensFromString(String tokenString)
{
- return System.getProperty("cassandra.replace_token", null);
+ List<String> tokens = new ArrayList<String>();
+ if (tokenString != null)
+ for (String token : tokenString.split(","))
+ tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", ""));
+ return tokens;
+ }
+
+ public static Integer getNumTokens()
+ {
+ return conf.num_tokens;
+ }
+
+ public static Collection<String> getReplaceTokens()
+ {
+ return tokensFromString(System.getProperty("cassandra.replace_token", null));
}
public static String getClusterName()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 77e064f..9634515 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -24,8 +24,11 @@ import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ExecutionException;
+import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Multimap;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -122,7 +125,9 @@ public class SystemTable
Iterator<IColumn> oldColumns = oldCf.columns.iterator();
String clusterName = ByteBufferUtil.string(oldColumns.next().value());
- String tokenBytes = ByteBufferUtil.bytesToHex(oldColumns.next().value());
+ // serialize the old token as a collection of (one )tokens.
+ Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value());
+ String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(Collections.singleton(token)));
// (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason)
String req = "INSERT INTO system.%s (key, cluster_name, token_bytes, bootstrapped) VALUES ('%s', '%s', '%s', 'true')";
processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes));
@@ -139,47 +144,102 @@ public class SystemTable
}
/**
- * Record token being used by another node
+ * Record tokens being used by another node
*/
- public static synchronized void updateToken(InetAddress ep, Token token)
+ public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
{
if (ep.equals(FBUtilities.getBroadcastAddress()))
{
- removeToken(token);
+ removeTokens(tokens);
return;
}
IPartitioner p = StorageService.getPartitioner();
- String req = "INSERT INTO system.%s (token_bytes, peer) VALUES ('%s', '%s')";
- String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
- processInternal(String.format(req, PEERS_CF, tokenBytes, ep.getHostAddress()));
+ for (Token token : tokens)
+ {
+ String req = "INSERT INTO system.%s (token_bytes, peer) VALUES ('%s', '%s')";
+ String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
+ processInternal(String.format(req, PEERS_CF, tokenBytes, ep.getHostAddress()));
+ }
forceBlockingFlush(PEERS_CF);
}
/**
- * Remove stored token being used by another node
+ * Remove stored tokens being used by another node
*/
- public static synchronized void removeToken(Token token)
+ public static synchronized void removeTokens(Collection<Token> tokens)
{
IPartitioner p = StorageService.getPartitioner();
- String req = "DELETE FROM system.%s WHERE token_bytes = '%s'";
- String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
- processInternal(String.format(req, PEERS_CF, tokenBytes));
+
+ for (Token token : tokens)
+ {
+ String req = "DELETE FROM system.%s WHERE token_bytes = '%s'";
+ String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
+ processInternal(String.format(req, PEERS_CF, tokenBytes));
+ }
forceBlockingFlush(PEERS_CF);
}
/**
- * This method is used to update the System Table with the new token for this node
+ * This method is used to update the System Table with the new tokens for this node
*/
- public static synchronized void updateToken(Token token)
+ public static synchronized void updateTokens(Collection<Token> tokens)
{
IPartitioner p = StorageService.getPartitioner();
+
String req = "INSERT INTO system.%s (key, token_bytes) VALUES ('%s', '%s')";
- String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
+ String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(tokens));
processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokenBytes));
forceBlockingFlush(LOCAL_CF);
}
+ /** Serialize a collection of tokens to bytes */
+ private static ByteBuffer serializeTokens(Collection<Token> tokens)
+ {
+ // Guesstimate the total number of bytes needed
+ int estCapacity = (tokens.size() * 16) + (tokens.size() * 2);
+ ByteBuffer toks = ByteBuffer.allocate(estCapacity);
+ IPartitioner p = StorageService.getPartitioner();
+
+ for (Token token : tokens)
+ {
+ ByteBuffer tokenBytes = p.getTokenFactory().toByteArray(token);
+
+ // If we blow the buffer, grow it by double
+ if (toks.remaining() < (2 + tokenBytes.remaining()))
+ {
+ estCapacity = estCapacity * 2;
+ ByteBuffer newToks = ByteBuffer.allocate(estCapacity);
+ toks.flip();
+ newToks.put(toks);
+ toks = newToks;
+ }
+
+ toks.putShort((short)tokenBytes.remaining());
+ toks.put(tokenBytes);
+ }
+
+ toks.flip();
+ return toks;
+ }
+
+ private static Collection<Token> deserializeTokens(ByteBuffer tokenBytes)
+ {
+ List<Token> tokens = new ArrayList<Token>();
+ IPartitioner p = StorageService.getPartitioner();
+
+ while(tokenBytes.hasRemaining())
+ {
+ short len = tokenBytes.getShort();
+ ByteBuffer dup = tokenBytes.slice();
+ dup.limit(len);
+ tokenBytes.position(tokenBytes.position() + len);
+ tokens.add(p.getTokenFactory().fromByteArray(dup));
+ }
+
+ return tokens;
+ }
+
private static void forceBlockingFlush(String cfname)
{
try
@@ -200,13 +260,13 @@ public class SystemTable
* Return a map of stored tokens to IP addresses
*
*/
- public static HashMap<Token, InetAddress> loadTokens()
+ public static Multimap<InetAddress, Token> loadTokens()
{
IPartitioner p = StorageService.getPartitioner();
- HashMap<Token, InetAddress> tokenMap = new HashMap<Token, InetAddress>();
+ Multimap<InetAddress, Token> tokenMap = HashMultimap.create();
for (UntypedResultSet.Row row : processInternal("SELECT * FROM system." + PEERS_CF))
- tokenMap.put(p.getTokenFactory().fromByteArray(row.getBytes("token_bytes")), row.getInetAddress("peer"));
+ tokenMap.put(row.getInetAddress("peer"), p.getTokenFactory().fromByteArray(row.getBytes("token_bytes")));
return tokenMap;
}
@@ -254,13 +314,13 @@ public class SystemTable
throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
}
- public static Token getSavedToken()
+ public static Collection<Token> getSavedTokens()
{
String req = "SELECT token_bytes FROM system.%s WHERE key='%s'";
UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
return result.isEmpty() || !result.one().has("token_bytes")
- ? null
- : StorageService.getPartitioner().getTokenFactory().fromByteArray(result.one().getBytes("token_bytes"));
+ ? Collections.<Token>emptyList()
+ : deserializeTokens(result.one().getBytes("token_bytes"));
}
public static int incrementAndGetGeneration() throws IOException
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 8abc48d..ba14c32 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -50,17 +50,17 @@ public class BootStrapper
/* endpoint that needs to be bootstrapped */
protected final InetAddress address;
/* token of the node being bootstrapped. */
- protected final Token<?> token;
+ protected final Collection<Token> tokens;
protected final TokenMetadata tokenMetadata;
private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
- public BootStrapper(InetAddress address, Token token, TokenMetadata tmd)
+ public BootStrapper(InetAddress address, Collection<Token> tokens, TokenMetadata tmd)
{
assert address != null;
- assert token != null;
+ assert tokens != null && !tokens.isEmpty();
this.address = address;
- this.token = token;
+ this.tokens = tokens;
tokenMetadata = tmd;
}
@@ -75,7 +75,7 @@ public class BootStrapper
for (String table : Schema.instance.getNonSystemTables())
{
AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
- streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, token, address));
+ streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
}
streamer.fetch();
@@ -83,23 +83,49 @@ public class BootStrapper
}
/**
- * if initialtoken was specified, use that.
- * otherwise, pick a token to assume half the load of the most-loaded node.
+ * if initialtoken was specified, use that (split on comma).
+ * otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
+ * else choose num_tokens tokens at random
*/
- public static Token getBootstrapToken(final TokenMetadata metadata, final Map<InetAddress, Double> load) throws IOException, ConfigurationException
+ public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, Map<InetAddress, Double> load) throws IOException, ConfigurationException
{
- if (DatabaseDescriptor.getInitialToken() != null)
+ Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
+ if (initialTokens.size() > 0)
{
- logger.debug("token manually specified as " + DatabaseDescriptor.getInitialToken());
- Token token = StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
- if (metadata.getEndpoint(token) != null)
- throw new ConfigurationException("Bootstraping to existing token " + token + " is not allowed (decommission/removetoken the old node first).");
- return token;
+ logger.debug("tokens manually specified as {}", initialTokens);
+ List<Token> tokens = new ArrayList<Token>();
+ for (String tokenString : initialTokens)
+ {
+ Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
+ if (metadata.getEndpoint(token) != null)
+ throw new ConfigurationException("Bootstraping to existing token " + tokenString + " is not allowed (decommission/removetoken the old node first).");
+ tokens.add(token);
+ }
+ return tokens;
}
- return getBalancedToken(metadata, load);
+ int numTokens = DatabaseDescriptor.getNumTokens();
+ if (numTokens < 1)
+ throw new ConfigurationException("num_tokens must be >= 1");
+ if (numTokens == 1)
+ return Collections.singleton(getBalancedToken(metadata, load));
+
+ return getRandomTokens(metadata, numTokens);
+ }
+
+ public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
+ {
+ Set<Token> tokens = new HashSet<Token>(numTokens);
+ while (tokens.size() < numTokens)
+ {
+ Token token = StorageService.getPartitioner().getRandomToken();
+ if (metadata.getEndpoint(token) == null)
+ tokens.add(token);
+ }
+ return tokens;
}
+ @Deprecated
public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load)
{
InetAddress maxEndpoint = getBootstrapSource(metadata, load);
@@ -108,6 +134,7 @@ public class BootStrapper
return t;
}
+ @Deprecated
static InetAddress getBootstrapSource(final TokenMetadata metadata, final Map<InetAddress, Double> load)
{
// sort first by number of nodes already bootstrapping into a source node's range, then by load.
@@ -151,6 +178,7 @@ public class BootStrapper
return maxEndpoint;
}
+ @Deprecated
static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
{
MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN);
@@ -170,6 +198,7 @@ public class BootStrapper
throw new RuntimeException("Bootstrap failed, could not obtain token from: " + maxEndpoint);
}
+ @Deprecated
public static class BootstrapTokenVerbHandler implements IVerbHandler
{
public void doVerb(MessageIn message, String id)
@@ -181,6 +210,7 @@ public class BootStrapper
}
}
+ @Deprecated
private static class BootstrapTokenCallback implements IAsyncCallback<String>
{
private volatile Token<?> token;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 4208c07..8d55dbf 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -412,7 +412,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
InetAddress endpoint = InetAddress.getByName(address);
EndpointState epState = endpointStateMap.get(endpoint);
- Token token = null;
+ Collection<Token> tokens = null;
logger.warn("Assassinating {} via gossip", endpoint);
if (epState == null)
{
@@ -422,7 +422,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
{
try
{
- token = StorageService.instance.getTokenMetadata().getToken(endpoint);
+ tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint);
}
catch (AssertionError e)
{
@@ -444,10 +444,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
epState.updateTimestamp(); // make sure we don't evict it too soon
epState.getHeartBeatState().forceNewerGenerationUnsafe();
}
- if (token == null)
- token = StorageService.instance.getBootstrapToken();
+ if (tokens == null)
+ tokens = Arrays.asList(StorageService.instance.getBootstrapToken());
// do not pass go, do not collect 200 dollars, just gtfo
- epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(token, computeExpireTime()));
+ epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(tokens, computeExpireTime()));
handleMajorStateChange(endpoint, epState);
try
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index ff41fcb..a5dc9cd 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -19,6 +19,10 @@ package org.apache.cassandra.gms;
import java.io.*;
import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
import org.apache.cassandra.db.TypeSizes;
@@ -107,18 +111,26 @@ public class VersionedValue implements Comparable<VersionedValue>
this.partitioner = partitioner;
}
- public VersionedValue bootstrapping(Token token, UUID hostId)
+ public VersionedValue bootstrapping(Collection<Token> tokens, UUID hostId)
{
return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,
hostId.toString(),
- partitioner.getTokenFactory().toString(token)));
+ makeTokenString(tokens)));
}
- public VersionedValue normal(Token token, UUID hostId)
+ public VersionedValue normal(Collection<Token> tokens, UUID hostId)
{
return new VersionedValue(versionString(VersionedValue.STATUS_NORMAL,
hostId.toString(),
- partitioner.getTokenFactory().toString(token)));
+ makeTokenString(tokens)));
+ }
+
+ private String makeTokenString(Collection<Token> tokens)
+ {
+ List<String> tokenStrings = new ArrayList<String>();
+ for (Token<?> token : tokens)
+ tokenStrings.add(partitioner.getTokenFactory().toString(token));
+ return StringUtils.join(tokenStrings, VersionedValue.DELIMITER);
}
public VersionedValue load(double load)
@@ -131,15 +143,17 @@ public class VersionedValue implements Comparable<VersionedValue>
return new VersionedValue(newVersion.toString());
}
- public VersionedValue leaving(Token token)
+ public VersionedValue leaving(Collection<Token> tokens)
{
- return new VersionedValue(VersionedValue.STATUS_LEAVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+ return new VersionedValue(versionString(VersionedValue.STATUS_LEAVING,
+ makeTokenString(tokens)));
}
- public VersionedValue left(Token token, long expireTime)
+ public VersionedValue left(Collection<Token> tokens, long expireTime)
{
- return new VersionedValue(VersionedValue.STATUS_LEFT + VersionedValue.DELIMITER
- + partitioner.getTokenFactory().toString(token) + VersionedValue.DELIMITER + expireTime);
+ return new VersionedValue(versionString(VersionedValue.STATUS_LEFT,
+ Long.toString(expireTime),
+ makeTokenString(tokens)));
}
public VersionedValue moving(Token token)
@@ -226,6 +240,12 @@ public class VersionedValue implements Comparable<VersionedValue>
outValue = versionString(pieces[0], pieces[2]);
}
+ if (type.equals(STATUS_LEFT))
+ {
+ assert pieces.length >= 3;
+ outValue = versionString(pieces[0], pieces[2], pieces[1]);
+ }
+
if ((type.equals(REMOVAL_COORDINATOR)) || (type.equals(REMOVING_TOKEN)) || (type.equals(REMOVED_TOKEN)))
throw new RuntimeException(String.format("Unable to serialize %s(%s...) for nodes older than 1.2",
VersionedValue.class.getName(), type));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 7fa431a..66fb849 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -181,8 +181,13 @@ public abstract class AbstractReplicationStrategy
public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
{
+ return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress);
+ }
+
+ public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddress pendingAddress)
+ {
TokenMetadata temp = metadata.cloneOnlyTokenMap();
- temp.updateNormalToken(pendingToken, pendingAddress);
+ temp.updateNormalTokens(pendingTokens, pendingAddress);
return getAddressRanges(temp).get(pendingAddress);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 8fb63d5..7ad962b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -296,14 +296,15 @@ public class TokenMetadata
}
}
- public void removeBootstrapToken(Token token)
+ public void removeBootstrapTokens(Collection<Token> tokens)
{
- assert token != null;
+ assert tokens != null && !tokens.isEmpty();
lock.writeLock().lock();
try
{
- bootstrapTokens.remove(token);
+ for (Token token : tokens)
+ bootstrapTokens.remove(token);
}
finally
{
@@ -847,6 +848,23 @@ public class TokenMetadata
return endpoints;
}
+ /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
+ public Multimap<InetAddress, Token> getEndpointToTokenMapForReading()
+ {
+ lock.readLock().lock();
+ try
+ {
+ Multimap<InetAddress, Token> cloned = HashMultimap.<InetAddress, Token>create();
+ for (Map.Entry<Token, InetAddress> entry : tokenToEndpointMap.entrySet())
+ cloned.put(entry.getValue(), entry.getKey());
+ return cloned;
+ }
+ finally
+ {
+ lock.readLock().unlock();
+ }
+ }
+
/**
* @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
* in the cluster.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 583b3a7..a58653d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -169,14 +169,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
/** This method updates the local token on disk */
- public void setToken(Token token)
+ public void setTokens(Collection<Token> tokens)
{
if (logger.isDebugEnabled())
- logger.debug("Setting token to {}", token);
- SystemTable.updateToken(token);
- tokenMetadata.updateNormalToken(token, FBUtilities.getBroadcastAddress());
+ logger.debug("Setting tokens to {}", tokens);
+ SystemTable.updateTokens(tokens);
+ tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
- valueFactory.normal(getLocalToken(), SystemTable.getLocalHostId()));
+ valueFactory.normal(getLocalTokens(), SystemTable.getLocalHostId()));
setMode(Mode.NORMAL, false);
}
@@ -402,17 +402,18 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
{
logger.info("Loading persisted ring state");
- for (Map.Entry<Token, InetAddress> entry : SystemTable.loadTokens().entrySet())
+ Multimap<InetAddress, Token> loadedTokens = SystemTable.loadTokens();
+ for (InetAddress ep : loadedTokens.keySet())
{
- if (entry.getValue() == FBUtilities.getLocalAddress())
+ if (ep.equals(FBUtilities.getBroadcastAddress()))
{
// entry has been mistakenly added, delete it
- SystemTable.removeToken(entry.getKey());
+ SystemTable.removeTokens(loadedTokens.get(ep));
}
else
{
- tokenMetadata.updateNormalToken(entry.getKey(), entry.getValue());
- Gossiper.instance.addSavedEndpoint(entry.getValue());
+ tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep);
+ Gossiper.instance.addSavedEndpoint(ep);
}
}
}
@@ -500,7 +501,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
Schema.instance.updateVersionAndAnnounce();
// add rpc listening info
Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
- if (null != DatabaseDescriptor.getReplaceToken())
+ if (0 != DatabaseDescriptor.getReplaceTokens().size())
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.hibernate(true));
MessagingService.instance().listen(FBUtilities.getLocalAddress());
@@ -515,9 +516,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
&& !SystemTable.isBootstrapped())
logger.info("This node will not auto bootstrap because it is configured to be a seed node.");
- InetAddress current = null;
+ Set<InetAddress> current = new HashSet<InetAddress>();
// first startup is only chance to bootstrap
- Token<?> token;
+ Collection<Token> tokens;
if (DatabaseDescriptor.isAutoBootstrap()
&& !(SystemTable.isBootstrapped()
|| DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())
@@ -552,7 +553,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (logger.isDebugEnabled())
logger.debug("... got ring + schema info");
- if (DatabaseDescriptor.getReplaceToken() == null)
+ if (DatabaseDescriptor.getReplaceTokens().size() == 0)
{
if (tokenMetadata.isMember(FBUtilities.getBroadcastAddress()))
{
@@ -560,7 +561,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
throw new UnsupportedOperationException(s);
}
setMode(Mode.JOINING, "getting bootstrap token", true);
- token = BootStrapper.getBootstrapToken(tokenMetadata, LoadBroadcaster.instance.getLoadInfo());
+ tokens = BootStrapper.getBootstrapTokens(tokenMetadata, LoadBroadcaster.instance.getLoadInfo());
}
else
{
@@ -574,37 +575,53 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
throw new AssertionError(e);
}
- token = StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getReplaceToken());
+ tokens = new ArrayList<Token>();
+ for (String token : DatabaseDescriptor.getReplaceTokens())
+ tokens.add(StorageService.getPartitioner().getTokenFactory().fromString(token));
+
// check for operator errors...
- current = tokenMetadata.getEndpoint(token);
- if (null != current && Gossiper.instance.getEndpointStateForEndpoint(current).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
- throw new UnsupportedOperationException("Cannnot replace a token for a Live node... ");
- setMode(Mode.JOINING, "Replacing a node with token: " + token, true);
+ for (Token token : tokens)
+ {
+ InetAddress existing = tokenMetadata.getEndpoint(token);
+ if (null != existing)
+ {
+ if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
+ throw new UnsupportedOperationException("Cannnot replace a token for a Live node... ");
+ current.add(existing);
+ }
+ }
+
+ setMode(Mode.JOINING, "Replacing a node with token: " + tokens, true);
}
- bootstrap(token);
+ bootstrap(tokens);
assert !isBootstrapMode; // bootstrap will block until finished
}
else
{
- token = SystemTable.getSavedToken();
- if (token == null)
+ tokens = SystemTable.getSavedTokens();
+ if (tokens.isEmpty())
{
- String initialToken = DatabaseDescriptor.getInitialToken();
- if (initialToken == null)
+ Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
+ if (initialTokens.size() < 1)
{
- token = getPartitioner().getRandomToken();
- logger.warn("Generated random token " + token + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations");
+ tokens = BootStrapper.getRandomTokens(tokenMetadata, DatabaseDescriptor.getNumTokens());
+ if (DatabaseDescriptor.getNumTokens() == 1)
+ logger.warn("Generated random token " + tokens + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations");
+ else
+ logger.info("Generated random tokens.");
}
else
{
- token = getPartitioner().getTokenFactory().fromString(initialToken);
- logger.info("Saved token not found. Using " + token + " from configuration");
+ tokens = new ArrayList<Token>();
+ for (String token : initialTokens)
+ tokens.add(getPartitioner().getTokenFactory().fromString(token));
+ logger.info("Saved token not found. Using " + tokens + " from configuration");
}
}
else
{
- logger.info("Using saved token " + token);
+ logger.info("Using saved token " + tokens);
}
}
@@ -612,10 +629,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
// start participating in the ring.
SystemTable.setBootstrapped(true);
- setToken(token);
+ setTokens(tokens);
// remove the existing info about the replaced node.
- if (current != null)
- Gossiper.instance.replacedEndpoint(current);
+ if (!current.isEmpty())
+ for (InetAddress existing : current)
+ Gossiper.instance.replacedEndpoint(existing);
logger.info("Bootstrap/Replace/Move completed! Now serving reads.");
assert tokenMetadata.sortedTokens().size() > 0;
}
@@ -634,7 +652,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
else if (isSurveyMode)
{
- setToken(SystemTable.getSavedToken());
+ setTokens(SystemTable.getSavedTokens());
SystemTable.setBootstrapped(true);
isSurveyMode = false;
logger.info("Leaving write survey mode and joining ring at operator request");
@@ -708,15 +726,15 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
logger.debug(logMsg);
}
- private void bootstrap(Token token) throws IOException
+ private void bootstrap(Collection<Token> tokens) throws IOException
{
isBootstrapMode = true;
- SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
- if (null == DatabaseDescriptor.getReplaceToken())
+ SystemTable.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
+ if (0 == DatabaseDescriptor.getReplaceTokens().size())
{
// if not an existing token then bootstrap
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
- valueFactory.bootstrapping(token, SystemTable.getLocalHostId()));
+ valueFactory.bootstrapping(tokens, SystemTable.getLocalHostId()));
setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
try
{
@@ -730,10 +748,10 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
else
{
// Dont set any state for the node which is bootstrapping the existing token...
- tokenMetadata.updateNormalToken(token, FBUtilities.getBroadcastAddress());
+ tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
}
setMode(Mode.JOINING, "Starting to bootstrap...", true);
- new BootStrapper(FBUtilities.getBroadcastAddress(), token, tokenMetadata).bootstrap(); // handles token update
+ new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(); // handles token update
}
public boolean isBootstrapMode()
@@ -1052,10 +1070,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
else tokenPos = 1;
- Token token = getPartitioner().getTokenFactory().fromString(pieces[tokenPos]);
+ Collection<Token> tokens = new ArrayList<Token>();
+ for (int i = tokenPos; i < pieces.length; ++i)
+ tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
if (logger.isDebugEnabled())
- logger.debug("Node " + endpoint + " state bootstrapping, token " + token);
+ logger.debug("Node " + endpoint + " state bootstrapping, token " + tokens);
// if this node is present in token metadata, either we have missed intermediate states
// or the node had crashed. Print warning if needed, clear obsolete stuff and
@@ -1072,7 +1092,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
tokenMetadata.removeEndpoint(endpoint);
}
- tokenMetadata.addBootstrapToken(token, endpoint);
+ tokenMetadata.addBootstrapTokens(tokens, endpoint);
calculatePendingRanges();
if (usesHostId(endpoint))
@@ -1103,10 +1123,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
tokensPos = 1;
logger.debug("Using token position {} for {}", tokensPos, endpoint);
- Token token = getPartitioner().getTokenFactory().fromString(pieces[tokensPos]);
+ Collection<Token> tokens = new ArrayList<Token>();
+ for (int i = tokensPos; i < pieces.length; ++i)
+ tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
if (logger.isDebugEnabled())
- logger.debug("Node " + endpoint + " state normal, token " + token);
+ logger.debug("Node " + endpoint + " state normal, token " + tokens);
if (tokenMetadata.isMember(endpoint))
logger.info("Node " + endpoint + " state jump to normal");
@@ -1115,36 +1137,61 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (usesHostId(endpoint))
tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
- // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
- InetAddress currentOwner = tokenMetadata.getEndpoint(token);
- if (currentOwner == null)
- {
- logger.debug("New node " + endpoint + " at token " + token);
- tokenMetadata.updateNormalToken(token, endpoint);
- if (!isClientMode)
- SystemTable.updateToken(endpoint, token);
- }
- else if (endpoint.equals(currentOwner))
- {
- // set state back to normal, since the node may have tried to leave, but failed and is now back up
- // no need to persist, token/ip did not change
- tokenMetadata.updateNormalToken(token, endpoint);
- }
- else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
- {
- logger.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner",
- endpoint, currentOwner, token, endpoint));
- tokenMetadata.updateNormalToken(token, endpoint);
- Gossiper.instance.removeEndpoint(currentOwner);
- if (!isClientMode)
- SystemTable.updateToken(endpoint, token);
- }
- else
+ Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
+ Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>();
+ Set<InetAddress> endpointsToRemove = new HashSet<InetAddress>();
+ Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+
+ for (Token token : tokens)
{
- logger.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s",
- endpoint, currentOwner, token, endpoint));
+ // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
+ InetAddress currentOwner = tokenMetadata.getEndpoint(token);
+ if (currentOwner == null)
+ {
+ logger.debug("New node " + endpoint + " at token " + token);
+ tokensToUpdateInMetadata.add(token);
+ if (!isClientMode)
+ tokensToUpdateInSystemTable.add(token);
+ }
+ else if (endpoint.equals(currentOwner))
+ {
+ // set state back to normal, since the node may have tried to leave, but failed and is now back up
+ // no need to persist, token/ip did not change
+ tokensToUpdateInMetadata.add(token);
+ }
+ else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+ {
+ tokensToUpdateInMetadata.add(token);
+ if (!isClientMode)
+ tokensToUpdateInSystemTable.add(token);
+
+ // currentOwner is no longer current, endpoint is. Keep track of these moves, because when
+ // a host no longer has any tokens, we'll want to remove it.
+ epToTokenCopy.get(currentOwner).remove(token);
+ if (epToTokenCopy.get(currentOwner).size() < 1)
+ endpointsToRemove.add(currentOwner);
+
+ logger.info(String.format("Nodes %s and %s have the same token %s. %s is the new owner",
+ endpoint,
+ currentOwner,
+ token,
+ endpoint));
+ }
+ else
+ {
+ logger.info(String.format("Nodes %s and %s have the same token %s. Ignoring %s",
+ endpoint,
+ currentOwner,
+ token,
+ endpoint));
+ }
}
+ tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
+ for (InetAddress ep : endpointsToRemove)
+ Gossiper.instance.removeEndpoint(ep);
+ SystemTable.updateTokens(endpoint, tokensToUpdateInSystemTable);
+
if (tokenMetadata.isMoving(endpoint)) // if endpoint was moving to a new token
tokenMetadata.removeFromMoving(endpoint);
@@ -1160,11 +1207,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
private void handleStateLeaving(InetAddress endpoint, String[] pieces)
{
assert pieces.length >= 2;
- String moveValue = pieces[1];
- Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+ Collection<Token> tokens = new ArrayList<Token>();
+ for (int i = 1; i < pieces.length; ++i)
+ tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
if (logger.isDebugEnabled())
- logger.debug("Node " + endpoint + " state leaving, token " + token);
+ logger.debug("Node " + endpoint + " state leaving, tokens " + tokens);
// If the node is previously unknown or tokens do not match, update tokenmetadata to
// have this node as 'normal' (it must have been using this token before the
@@ -1172,12 +1220,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (!tokenMetadata.isMember(endpoint))
{
logger.info("Node " + endpoint + " state jump to leaving");
- tokenMetadata.updateNormalToken(token, endpoint);
+ tokenMetadata.updateNormalTokens(tokens, endpoint);
}
- else if (!tokenMetadata.getTokens(endpoint).contains(token))
+ else if (!tokenMetadata.getTokens(endpoint).containsAll(tokens))
{
logger.warn("Node " + endpoint + " 'leaving' token mismatch. Long network partition?");
- tokenMetadata.updateNormalToken(token, endpoint);
+ tokenMetadata.updateNormalTokens(tokens, endpoint);
}
// at this point the endpoint is certainly a member with this token, so let's proceed
@@ -1195,12 +1243,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
private void handleStateLeft(InetAddress endpoint, String[] pieces)
{
assert pieces.length >= 2;
- Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
+ Collection<Token> tokens = null;
+ Integer version = MessagingService.instance().getVersion(endpoint);
+ if (version < MessagingService.VERSION_12)
+ tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1]));
+ else
+ {
+ tokens = new ArrayList<Token>(pieces.length - 2);
+ for (int i = 2; i < pieces.length; ++i)
+ tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
+ }
if (logger.isDebugEnabled())
- logger.debug("Node " + endpoint + " state left, token " + token);
+ logger.debug("Node " + endpoint + " state left, tokens " + tokens);
- excise(token, endpoint, extractExpireTime(pieces));
+ excise(tokens, endpoint, extractExpireTime(pieces, version));
}
/**
@@ -1252,7 +1309,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
if (VersionedValue.REMOVED_TOKEN.equals(state))
{
- excise(removeToken, endpoint, extractExpireTime(pieces));
+ excise(Collections.singleton(removeToken),
+ endpoint,
+ extractExpireTime(pieces, MessagingService.instance().getVersion(endpoint)));
}
else if (VersionedValue.REMOVING_TOKEN.equals(state))
{
@@ -1272,24 +1331,24 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
} // not a member, nothing to do
}
- private void excise(Token token, InetAddress endpoint)
+ private void excise(Collection<Token> tokens, InetAddress endpoint)
{
HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
Gossiper.instance.removeEndpoint(endpoint);
tokenMetadata.removeEndpoint(endpoint);
- tokenMetadata.removeBootstrapToken(token);
+ tokenMetadata.removeBootstrapTokens(tokens);
calculatePendingRanges();
if (!isClientMode)
{
- logger.info("Removing token " + token + " for " + endpoint);
- SystemTable.removeToken(token);
+ logger.info("Removing tokens " + tokens + " for " + endpoint);
+ SystemTable.removeTokens(tokens);
}
}
- private void excise(Token token, InetAddress endpoint, long expireTime)
+ private void excise(Collection<Token> tokens, InetAddress endpoint, long expireTime)
{
addExpireTimeIfFound(endpoint, expireTime);
- excise(token, endpoint);
+ excise(tokens, endpoint);
}
protected void addExpireTimeIfFound(InetAddress endpoint, long expireTime)
@@ -1300,14 +1359,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- protected long extractExpireTime(String[] pieces)
+ protected long extractExpireTime(String[] pieces, int version)
{
- long expireTime = 0L;
- if (pieces.length >= 3)
+ if (version < MessagingService.VERSION_12)
{
- expireTime = Long.parseLong(pieces[2]);
+ if (pieces.length >= 3)
+ return Long.parseLong(pieces[2]);
+ else
+ return 0L;
+ } else
+ {
+ if (VersionedValue.STATUS_LEFT.equals(pieces[0]))
+ return Long.parseLong(pieces[1]);
+ else
+ return Long.parseLong(pieces[2]);
}
- return expireTime;
}
/**
@@ -1653,18 +1719,18 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
HintedHandOffManager.instance.scheduleHintDelivery(host);
}
- public Token getLocalToken()
+ public Collection<Token> getLocalTokens()
{
- Token token = SystemTable.getSavedToken();
- assert token != null; // should not be called before initServer sets this
- return token;
+ Collection<Token> tokens = SystemTable.getSavedTokens();
+ assert tokens != null && !tokens.isEmpty(); // should not be called before initServer sets this
+ return tokens;
}
/* These methods belong to the MBean interface */
public String getToken()
{
- return getLocalToken().toString();
+ return getLocalTokens().iterator().next().toString();
}
public String getReleaseVersion()
@@ -2255,7 +2321,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
*/
private void startLeaving()
{
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalToken()));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalTokens()));
tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddress());
calculatePendingRanges();
}
@@ -2299,7 +2365,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddress());
calculatePendingRanges();
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken(),Gossiper.computeExpireTime()));
+ Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2);
logger.info("Announcing that I have left the ring for " + delay + "ms");
try
@@ -2378,7 +2444,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.moving(newToken));
- setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalToken(), newToken), true);
+ setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalTokens().iterator().next(), newToken), true);
IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
@@ -2481,10 +2547,10 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- setToken(newToken); // setting new token as we have everything settled
+ setTokens(Collections.singleton(newToken)); // setting new token as we have everything settled
if (logger.isDebugEnabled())
- logger.debug("Successfully moved to new token {}", getLocalToken());
+ logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
}
/**
@@ -2515,7 +2581,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
UUID hostId = tokenMetadata.getHostId(endpoint);
Gossiper.instance.advertiseTokenRemoved(endpoint, hostId);
Token token = tokenMetadata.getToken(endpoint);
- excise(token, endpoint);
+ excise(Collections.singleton(token), endpoint);
}
replicatingNodes.clear();
removingNode = null;
@@ -2603,7 +2669,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
}
}
- excise(token, endpoint);
+ excise(Collections.singleton(token), endpoint);
// gossiper will indicate the token has left
Gossiper.instance.advertiseTokenRemoved(endpoint, hostId);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 7f8c227..d783413 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
@@ -230,7 +231,9 @@ public class Util
{
InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
Gossiper.instance.initializeNodeUnsafe(ep, 1);
- ss.onChange(ep, ApplicationState.STATUS, new VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i), hostIds.get(i)));
+ ss.onChange(ep,
+ ApplicationState.STATUS,
+ new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i)), hostIds.get(i)));
hosts.add(ep);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/db/SystemTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemTableTest.java b/test/unit/org/apache/cassandra/db/SystemTableTest.java
index 0c7b8a3..7c7eb24 100644
--- a/test/unit/org/apache/cassandra/db/SystemTableTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemTableTest.java
@@ -23,24 +23,36 @@ package org.apache.cassandra.db;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.UUID;
-import com.google.common.base.Charsets;
import org.junit.Test;
import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.utils.ByteBufferUtil;
public class SystemTableTest
{
@Test
- public void testLocalToken()
+ public void testLocalTokens()
{
- SystemTable.updateToken(new BytesToken(ByteBufferUtil.bytes("token")));
- assert new String(((BytesToken) SystemTable.getSavedToken()).token, Charsets.UTF_8).equals("token");
+ // Remove all existing tokens
+ SystemTable.updateTokens(Collections.<Token> emptySet());
- SystemTable.updateToken(new BytesToken(ByteBufferUtil.bytes("token2")));
- assert new String(((BytesToken) SystemTable.getSavedToken()).token, Charsets.UTF_8).equals("token2");
+ List<Token> tokens = new ArrayList<Token>()
+ {{
+ for (int i = 0; i < 9; i++)
+ add(new BytesToken(ByteBufferUtil.bytes(String.format("token%d", i))));
+ }};
+
+ SystemTable.updateTokens(tokens);
+ int count = 0;
+
+ for (Token tok : SystemTable.getSavedTokens())
+ assert tokens.get(count++).equals(tok);
}
@Test
@@ -48,10 +60,10 @@ public class SystemTableTest
{
BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
InetAddress address = InetAddress.getByName("127.0.0.2");
- SystemTable.updateToken(address, token);
- assert SystemTable.loadTokens().get(token).equals(address);
- SystemTable.removeToken(token);
- assert !SystemTable.loadTokens().containsKey(token);
+ SystemTable.updateTokens(address, Collections.<Token>singletonList(token));
+ assert SystemTable.loadTokens().get(address).contains(token);
+ SystemTable.removeTokens(Collections.<Token>singletonList(token));
+ assert !SystemTable.loadTokens().containsValue(token);
}
@Test
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 3c8cd3b..997c5e4 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.HashMap;
import java.util.Set;
@@ -105,7 +106,7 @@ public class BootStrapperTest extends SchemaLoader
assert range.contains(token);
ss.onChange(bootstrapAddrs[i],
ApplicationState.STATUS,
- StorageService.instance.valueFactory.bootstrapping(token, bootstrapHostIds[i]));
+ StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(token), bootstrapHostIds[i]));
}
// any further attempt to bootsrtap should fail since every node in the cluster is splitting.
@@ -124,7 +125,7 @@ public class BootStrapperTest extends SchemaLoader
Token token = StorageService.getPartitioner().midpoint(range.left, range.right);
ss.onChange(bootstrapAddrs[2],
ApplicationState.STATUS,
- StorageService.instance.valueFactory.normal(token, bootstrapHostIds[2]));
+ StorageService.instance.valueFactory.normal(Collections.singleton(token), bootstrapHostIds[2]));
load.put(bootstrapAddrs[2], 0d);
InetAddress addr = BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load);
assert addr != null && addr.equals(addrs[2]);
@@ -158,7 +159,7 @@ public class BootStrapperTest extends SchemaLoader
assert range5.contains(fakeToken);
ss.onChange(myEndpoint,
ApplicationState.STATUS,
- StorageService.instance.valueFactory.bootstrapping(fakeToken, UUID.randomUUID()));
+ StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(fakeToken), UUID.randomUUID()));
tmd = ss.getTokenMetadata();
InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index afeaab3..4267d72 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -19,6 +19,7 @@
package org.apache.cassandra.gms;
import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.FBUtilities;
import org.junit.Test;
@@ -28,6 +29,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -112,7 +114,8 @@ public class SerializationsTest extends AbstractSerializationsTester
private static EndpointState EndpointSt = new EndpointState(HeartbeatSt);
private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner());
private static VersionedValue vv0 = vvFact.load(23d);
- private static VersionedValue vv1 = vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken(), UUID.randomUUID());
+ private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(StorageService.getPartitioner().getRandomToken()),
+ UUID.randomUUID());
private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index c373fa8..68b34b4 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -94,7 +94,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
aes = AntiEntropyService.instance;
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
tmd.clearUnsafe();
- StorageService.instance.setToken(StorageService.getPartitioner().getRandomToken());
+ StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
assert tmd.isMember(REMOTE);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index fc3d8a6..3b76c2c 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -104,7 +104,7 @@ public class LeaveAndBootstrapTest
// Third node leaves
ss.onChange(hosts.get(LEAVING_NODE),
ApplicationState.STATUS,
- valueFactory.leaving(endpointTokens.get(LEAVING_NODE)));
+ valueFactory.leaving(Collections.singleton(endpointTokens.get(LEAVING_NODE))));
assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE)));
AbstractReplicationStrategy strategy;
@@ -158,16 +158,22 @@ public class LeaveAndBootstrapTest
// nodes 6, 8 and 9 leave
final int[] LEAVING = new int[] {6, 8, 9};
for (int leaving : LEAVING)
- ss.onChange(hosts.get(leaving), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(leaving)));
+ ss.onChange(hosts.get(leaving),
+ ApplicationState.STATUS,
+ valueFactory.leaving(Collections.singleton(endpointTokens.get(leaving))));
// boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
InetAddress boot1 = InetAddress.getByName("127.0.1.1");
Gossiper.instance.initializeNodeUnsafe(boot1, 1);
UUID boot1Id = UUID.randomUUID();
- ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5), boot1Id));
+ ss.onChange(boot1,
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), boot1Id));
InetAddress boot2 = InetAddress.getByName("127.0.1.2");
Gossiper.instance.initializeNodeUnsafe(boot2, 1);
- ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID()));
+ ss.onChange(boot2,
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID()));
Collection<InetAddress> endpoints = null;
@@ -320,10 +326,10 @@ public class LeaveAndBootstrapTest
// Now finish node 6 and node 9 leaving, as well as boot1 (after this node 8 is still
// leaving and boot2 in progress
ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATUS,
- valueFactory.left(endpointTokens.get(LEAVING[0]), Gossiper.computeExpireTime()));
+ valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[0])), Gossiper.computeExpireTime()));
ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS,
- valueFactory.left(endpointTokens.get(LEAVING[2]), Gossiper.computeExpireTime()));
- ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(keyTokens.get(5), boot1Id));
+ valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[2])), Gossiper.computeExpireTime()));
+ ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(5)), boot1Id));
// adjust precalcuated results. this changes what the epected endpoints are.
expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
@@ -445,7 +451,9 @@ public class LeaveAndBootstrapTest
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 7);
// node 2 leaves
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(2)));
+ ss.onChange(hosts.get(2),
+ ApplicationState.STATUS,
+ valueFactory.leaving(Collections.singleton(endpointTokens.get(2))));
// don't bother to test pending ranges here, that is extensively tested by other
// tests. Just check that the node is in appropriate lists.
@@ -454,14 +462,18 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getBootstrapTokens().isEmpty());
// Bootstrap the node immedidiately to keyTokens.get(4) without going through STATE_LEFT
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(4), hostIds.get(2)));
+ ss.onChange(hosts.get(2),
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(4)), hostIds.get(2)));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(4)).equals(hosts.get(2)));
// Bootstrap node hosts.get(3) to keyTokens.get(1)
- ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3)));
+ ss.onChange(hosts.get(3),
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3)));
assertFalse(tmd.isMember(hosts.get(3)));
assertFalse(tmd.isLeaving(hosts.get(3)));
@@ -469,7 +481,9 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
// Bootstrap node hosts.get(2) further to keyTokens.get(3)
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(3), hostIds.get(2)));
+ ss.onChange(hosts.get(2),
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(3)), hostIds.get(2)));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -478,8 +492,10 @@ public class LeaveAndBootstrapTest
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
// Go to normal again for both nodes
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(3), hostIds.get(2)));
- ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2), hostIds.get(3)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(3)),
+ hostIds.get(2)));
+ ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)),
+ hostIds.get(3)));
assertTrue(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -509,22 +525,24 @@ public class LeaveAndBootstrapTest
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6);
// node 2 leaves
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(2)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(endpointTokens.get(2))));
assertTrue(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
// back to normal
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2), hostIds.get(2)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)),
+ hostIds.get(2)));
assertTrue(tmd.getLeavingEndpoints().isEmpty());
assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(2)));
// node 3 goes through leave and left and then jumps to normal at its new token
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(2)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(2))));
ss.onChange(hosts.get(2), ApplicationState.STATUS,
- valueFactory.left(keyTokens.get(2), Gossiper.computeExpireTime()));
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(4), hostIds.get(2)));
+ valueFactory.left(Collections.singleton(keyTokens.get(2)), Gossiper.computeExpireTime()));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4)),
+ hostIds.get(2)));
assertTrue(tmd.getBootstrapTokens().isEmpty());
assertTrue(tmd.getLeavingEndpoints().isEmpty());
@@ -549,21 +567,23 @@ public class LeaveAndBootstrapTest
Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6);
// node 2 leaves with _different_ token
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(0)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(0))));
assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(0)));
assertTrue(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getEndpoint(endpointTokens.get(2)) == null);
// go to boostrap
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(2)));
+ ss.onChange(hosts.get(2),
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
assertTrue(tmd.getBootstrapTokens().size() == 1);
assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(2)));
// jump to leaving again
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(1)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(1))));
assertTrue(tmd.getEndpoint(keyTokens.get(1)).equals(hosts.get(2)));
assertTrue(tmd.isLeaving(hosts.get(2)));
@@ -571,7 +591,7 @@ public class LeaveAndBootstrapTest
// go to state left
ss.onChange(hosts.get(2), ApplicationState.STATUS,
- valueFactory.left(keyTokens.get(1), Gossiper.computeExpireTime()));
+ valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime()));
assertFalse(tmd.isMember(hosts.get(2)));
assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -596,12 +616,12 @@ public class LeaveAndBootstrapTest
// node hosts.get(2) goes jumps to left
ss.onChange(hosts.get(2), ApplicationState.STATUS,
- valueFactory.left(endpointTokens.get(2), Gossiper.computeExpireTime()));
+ valueFactory.left(Collections.singleton(endpointTokens.get(2)), Gossiper.computeExpireTime()));
assertFalse(tmd.isMember(hosts.get(2)));
// node hosts.get(4) goes to bootstrap
- ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3)));
+ ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3)));
assertFalse(tmd.isMember(hosts.get(3)));
assertTrue(tmd.getBootstrapTokens().size() == 1);
@@ -609,7 +629,7 @@ public class LeaveAndBootstrapTest
// and then directly to 'left'
ss.onChange(hosts.get(2), ApplicationState.STATUS,
- valueFactory.left(keyTokens.get(1), Gossiper.computeExpireTime()));
+ valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime()));
assertTrue(tmd.getBootstrapTokens().size() == 0);
assertFalse(tmd.isMember(hosts.get(2)));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index 42f20fc..cf42564 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -136,7 +136,7 @@ public class MoveTest
}
// moving endpoint back to the normal state
- ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(MOVING_NODE)));
+ ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken), hostIds.get(MOVING_NODE)));
}
/*
@@ -182,10 +182,14 @@ public class MoveTest
// boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
InetAddress boot1 = InetAddress.getByName("127.0.1.1");
Gossiper.instance.initializeNodeUnsafe(boot1, 1);
- ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5), UUID.randomUUID()));
+ ss.onChange(boot1,
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), UUID.randomUUID()));
InetAddress boot2 = InetAddress.getByName("127.0.1.2");
Gossiper.instance.initializeNodeUnsafe(boot2, 1);
- ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID()));
+ ss.onChange(boot2,
+ ApplicationState.STATUS,
+ valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID()));
// don't require test update every time a new keyspace is added to test/conf/cassandra.yaml
Map<String, AbstractReplicationStrategy> tableStrategyMap = new HashMap<String, AbstractReplicationStrategy>();
@@ -471,7 +475,9 @@ public class MoveTest
// all moving nodes are back to the normal state
for (Integer movingIndex : MOVING)
{
- ss.onChange(hosts.get(movingIndex), ApplicationState.STATUS, valueFactory.normal(newTokens.get(movingIndex), hostIds.get(movingIndex)));
+ ss.onChange(hosts.get(movingIndex),
+ ApplicationState.STATUS,
+ valueFactory.normal(Collections.singleton(newTokens.get(movingIndex)), hostIds.get(movingIndex)));
}
}
@@ -500,7 +506,8 @@ public class MoveTest
assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
// back to normal
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(2)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken),
+ hostIds.get(2)));
assertTrue(tmd.getMovingEndpoints().isEmpty());
assertTrue(tmd.getToken(hosts.get(2)).equals(newToken));
@@ -508,7 +515,8 @@ public class MoveTest
newToken = positionToken(8);
// node 2 goes through leave and left and then jumps to normal at its new token
ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.moving(newToken));
- ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(2)));
+ ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken),
+ hostIds.get(2)));
assertTrue(tmd.getBootstrapTokens().isEmpty());
assertTrue(tmd.getMovingEndpoints().isEmpty());