You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2012/07/18 20:49:39 UTC

[5/5] git commit: virtual nodes support for bootstrap and decommission

virtual nodes support for bootstrap and decommission

Patch by Sam Overton and eevans; reviewed by Brandon Williams for CASSANDRA-4122


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/66b96ee5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/66b96ee5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/66b96ee5

Branch: refs/heads/trunk
Commit: 66b96ee5357b639729b6f220b1d5453027b866c6
Parents: 72aa824
Author: Eric Evans <ee...@apache.org>
Authored: Wed Jul 18 13:27:56 2012 -0500
Committer: Eric Evans <ee...@apache.org>
Committed: Wed Jul 18 13:27:56 2012 -0500

----------------------------------------------------------------------
 NEWS.txt                                           |    3 +
 conf/cassandra.yaml                                |   17 +-
 src/java/org/apache/cassandra/config/Config.java   |    1 +
 .../cassandra/config/DatabaseDescriptor.java       |   25 +-
 src/java/org/apache/cassandra/db/SystemTable.java  |  102 +++++--
 .../org/apache/cassandra/dht/BootStrapper.java     |   60 +++-
 src/java/org/apache/cassandra/gms/Gossiper.java    |   10 +-
 .../org/apache/cassandra/gms/VersionedValue.java   |   38 ++-
 .../locator/AbstractReplicationStrategy.java       |    7 +-
 .../apache/cassandra/locator/TokenMetadata.java    |   24 ++-
 .../apache/cassandra/service/StorageService.java   |  276 +++++++++------
 test/unit/org/apache/cassandra/Util.java           |    5 +-
 .../org/apache/cassandra/db/SystemTableTest.java   |   32 ++-
 .../org/apache/cassandra/dht/BootStrapperTest.java |    7 +-
 .../apache/cassandra/gms/SerializationsTest.java   |    5 +-
 .../service/AntiEntropyServiceTestAbstract.java    |    2 +-
 .../cassandra/service/LeaveAndBootstrapTest.java   |   70 +++--
 .../org/apache/cassandra/service/MoveTest.java     |   20 +-
 18 files changed, 492 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index fd13667..c52bd89 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -45,6 +45,9 @@ Features
       as well as other updates
     - rpc_timeout has been split up to allow finer-grained control
       on timeouts for different operation types
+    - num_tokens can now be specified in cassandra.yaml. This defines the
+      number of tokens assigned to the host on the ring (default: 1).
+      Also specifying initial_token will override any num_tokens setting.
 
 
 1.1.2

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index e4a46b8..1b89b2e 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -9,7 +9,22 @@
 # one logical cluster from joining another.
 cluster_name: 'Test Cluster'
 
-# You should always specify InitialToken when setting up a production
+# This defines the number of tokens randomly assigned to this node on the ring
+# The more tokens, relative to other nodes, the larger the proportion of data
+# that this node will store. You probably want all nodes to have the same number
+# of tokens assuming they have equal hardware capability.
+#
+# If you leave this unspecified, Cassandra will use the default of 1 token for legacy compatibility,
+# and will use the initial_token as described below.
+#
+# Specifying initial_token will override this setting.
+#
+# If you already have a cluster with 1 token per node, and wish to migrate to 
+# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
+# num_tokens: 256
+
+# If you haven't specified num_tokens, or have set it to the default of 1 then
+# you should always specify InitialToken when setting up a production
 # cluster for the first time, and often when adding capacity later.
 # The principle is that each node should be given an equal slice of
 # the token ring; see http://wiki.apache.org/cassandra/Operations

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index f5f4ef6..8da9777 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -38,6 +38,7 @@ public class Config
 
     /* initial token in the ring */
     public String initial_token;
+    public Integer num_tokens = 1;
 
     public Long rpc_timeout_in_ms = new Long(10000);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index 11d8417..878c6f1 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -437,7 +437,8 @@ public class DatabaseDescriptor
             }
 
             if (conf.initial_token != null)
-                partitioner.getTokenFactory().validate(conf.initial_token);
+                for (String token : tokensFromString(conf.initial_token))
+                    partitioner.getTokenFactory().validate(token);
 
             try
             {
@@ -667,14 +668,28 @@ public class DatabaseDescriptor
         return conf.column_index_size_in_kb * 1024;
     }
 
-    public static String getInitialToken()
+    public static Collection<String> getInitialTokens()
     {
-        return System.getProperty("cassandra.initial_token", conf.initial_token);
+        return tokensFromString(System.getProperty("cassandra.initial_token", conf.initial_token));
     }
 
-    public static String getReplaceToken()
+    public static Collection<String> tokensFromString(String tokenString)
     {
-        return System.getProperty("cassandra.replace_token", null);
+        List<String> tokens = new ArrayList<String>();
+        if (tokenString != null)
+            for (String token : tokenString.split(","))
+                tokens.add(token.replaceAll("^\\s+", "").replaceAll("\\s+$", ""));
+        return tokens;
+    }
+
+    public static Integer getNumTokens()
+    {
+        return conf.num_tokens;
+    }
+
+    public static Collection<String> getReplaceTokens()
+    {
+        return tokensFromString(System.getProperty("cassandra.replace_token", null));
     }
 
     public static String getClusterName()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/db/SystemTable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemTable.java b/src/java/org/apache/cassandra/db/SystemTable.java
index 77e064f..9634515 100644
--- a/src/java/org/apache/cassandra/db/SystemTable.java
+++ b/src/java/org/apache/cassandra/db/SystemTable.java
@@ -24,8 +24,11 @@ import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.concurrent.ExecutionException;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSortedSet;
+import com.google.common.collect.Multimap;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -122,7 +125,9 @@ public class SystemTable
             Iterator<IColumn> oldColumns = oldCf.columns.iterator();
 
             String clusterName = ByteBufferUtil.string(oldColumns.next().value());
-            String tokenBytes = ByteBufferUtil.bytesToHex(oldColumns.next().value());
+            // serialize the old token as a collection of (one )tokens.
+            Token token = StorageService.getPartitioner().getTokenFactory().fromByteArray(oldColumns.next().value());
+            String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(Collections.singleton(token)));
             // (assume that any node getting upgraded was bootstrapped, since that was stored in a separate row for no particular reason)
             String req = "INSERT INTO system.%s (key, cluster_name, token_bytes, bootstrapped) VALUES ('%s', '%s', '%s', 'true')";
             processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, clusterName, tokenBytes));
@@ -139,47 +144,102 @@ public class SystemTable
     }
 
     /**
-     * Record token being used by another node
+     * Record tokens being used by another node
      */
-    public static synchronized void updateToken(InetAddress ep, Token token)
+    public static synchronized void updateTokens(InetAddress ep, Collection<Token> tokens)
     {
         if (ep.equals(FBUtilities.getBroadcastAddress()))
         {
-            removeToken(token);
+            removeTokens(tokens);
             return;
         }
 
         IPartitioner p = StorageService.getPartitioner();
-        String req = "INSERT INTO system.%s (token_bytes, peer) VALUES ('%s', '%s')";
-        String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
-        processInternal(String.format(req, PEERS_CF, tokenBytes, ep.getHostAddress()));
+        for (Token token : tokens)
+        {
+            String req = "INSERT INTO system.%s (token_bytes, peer) VALUES ('%s', '%s')";
+            String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
+            processInternal(String.format(req, PEERS_CF, tokenBytes, ep.getHostAddress()));   
+        }
         forceBlockingFlush(PEERS_CF);
     }
 
     /**
-     * Remove stored token being used by another node
+     * Remove stored tokens being used by another node
      */
-    public static synchronized void removeToken(Token token)
+    public static synchronized void removeTokens(Collection<Token> tokens)
     {
         IPartitioner p = StorageService.getPartitioner();
-        String req = "DELETE FROM system.%s WHERE token_bytes = '%s'";
-        String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
-        processInternal(String.format(req, PEERS_CF, tokenBytes));
+
+        for (Token token : tokens)
+        {
+            String req = "DELETE FROM system.%s WHERE token_bytes = '%s'";
+            String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
+            processInternal(String.format(req, PEERS_CF, tokenBytes));   
+        }
         forceBlockingFlush(PEERS_CF);
     }
 
     /**
-     * This method is used to update the System Table with the new token for this node
+     * This method is used to update the System Table with the new tokens for this node
     */
-    public static synchronized void updateToken(Token token)
+    public static synchronized void updateTokens(Collection<Token> tokens)
     {
         IPartitioner p = StorageService.getPartitioner();
+
         String req = "INSERT INTO system.%s (key, token_bytes) VALUES ('%s', '%s')";
-        String tokenBytes = ByteBufferUtil.bytesToHex(p.getTokenFactory().toByteArray(token));
+        String tokenBytes = ByteBufferUtil.bytesToHex(serializeTokens(tokens));
         processInternal(String.format(req, LOCAL_CF, LOCAL_KEY, tokenBytes));
         forceBlockingFlush(LOCAL_CF);
     }
 
+    /** Serialize a collection of tokens to bytes */
+    private static ByteBuffer serializeTokens(Collection<Token> tokens)
+    {
+        // Guesstimate the total number of bytes needed
+        int estCapacity = (tokens.size() * 16) + (tokens.size() * 2);
+        ByteBuffer toks = ByteBuffer.allocate(estCapacity);
+        IPartitioner p = StorageService.getPartitioner();
+
+        for (Token token : tokens)
+        {
+            ByteBuffer tokenBytes = p.getTokenFactory().toByteArray(token);
+
+            // If we blow the buffer, grow it by double
+            if (toks.remaining() < (2 + tokenBytes.remaining()))
+            {
+                estCapacity = estCapacity * 2;
+                ByteBuffer newToks = ByteBuffer.allocate(estCapacity);
+                toks.flip();
+                newToks.put(toks);
+                toks = newToks;
+            }
+            
+            toks.putShort((short)tokenBytes.remaining());
+            toks.put(tokenBytes);
+        }
+        
+        toks.flip();
+        return toks;
+    }
+    
+    private static Collection<Token> deserializeTokens(ByteBuffer tokenBytes)
+    {
+        List<Token> tokens = new ArrayList<Token>();
+        IPartitioner p = StorageService.getPartitioner();
+
+        while(tokenBytes.hasRemaining())
+        {
+            short len = tokenBytes.getShort();
+            ByteBuffer dup = tokenBytes.slice();
+            dup.limit(len);
+            tokenBytes.position(tokenBytes.position() + len);
+            tokens.add(p.getTokenFactory().fromByteArray(dup));
+        }
+
+        return tokens;
+    }
+
     private static void forceBlockingFlush(String cfname)
     {
         try
@@ -200,13 +260,13 @@ public class SystemTable
      * Return a map of stored tokens to IP addresses
      *
      */
-    public static HashMap<Token, InetAddress> loadTokens()
+    public static Multimap<InetAddress, Token> loadTokens()
     {
         IPartitioner p = StorageService.getPartitioner();
 
-        HashMap<Token, InetAddress> tokenMap = new HashMap<Token, InetAddress>();
+        Multimap<InetAddress, Token> tokenMap = HashMultimap.create();
         for (UntypedResultSet.Row row : processInternal("SELECT * FROM system." + PEERS_CF))
-            tokenMap.put(p.getTokenFactory().fromByteArray(row.getBytes("token_bytes")), row.getInetAddress("peer"));
+            tokenMap.put(row.getInetAddress("peer"), p.getTokenFactory().fromByteArray(row.getBytes("token_bytes")));
 
         return tokenMap;
     }
@@ -254,13 +314,13 @@ public class SystemTable
             throw new ConfigurationException("Saved cluster name " + savedClusterName + " != configured name " + DatabaseDescriptor.getClusterName());
     }
 
-    public static Token getSavedToken()
+    public static Collection<Token> getSavedTokens()
     {
         String req = "SELECT token_bytes FROM system.%s WHERE key='%s'";
         UntypedResultSet result = processInternal(String.format(req, LOCAL_CF, LOCAL_KEY));
         return result.isEmpty() || !result.one().has("token_bytes")
-             ? null
-             : StorageService.getPartitioner().getTokenFactory().fromByteArray(result.one().getBytes("token_bytes"));
+             ? Collections.<Token>emptyList()
+             : deserializeTokens(result.one().getBytes("token_bytes"));
     }
 
     public static int incrementAndGetGeneration() throws IOException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/dht/BootStrapper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/dht/BootStrapper.java b/src/java/org/apache/cassandra/dht/BootStrapper.java
index 8abc48d..ba14c32 100644
--- a/src/java/org/apache/cassandra/dht/BootStrapper.java
+++ b/src/java/org/apache/cassandra/dht/BootStrapper.java
@@ -50,17 +50,17 @@ public class BootStrapper
     /* endpoint that needs to be bootstrapped */
     protected final InetAddress address;
     /* token of the node being bootstrapped. */
-    protected final Token<?> token;
+    protected final Collection<Token> tokens;
     protected final TokenMetadata tokenMetadata;
     private static final long BOOTSTRAP_TIMEOUT = 30000; // default bootstrap timeout of 30s
 
-    public BootStrapper(InetAddress address, Token token, TokenMetadata tmd)
+    public BootStrapper(InetAddress address, Collection<Token> tokens, TokenMetadata tmd)
     {
         assert address != null;
-        assert token != null;
+        assert tokens != null && !tokens.isEmpty();
 
         this.address = address;
-        this.token = token;
+        this.tokens = tokens;
         tokenMetadata = tmd;
     }
 
@@ -75,7 +75,7 @@ public class BootStrapper
         for (String table : Schema.instance.getNonSystemTables())
         {
             AbstractReplicationStrategy strategy = Table.open(table).getReplicationStrategy();
-            streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, token, address));
+            streamer.addRanges(table, strategy.getPendingAddressRanges(tokenMetadata, tokens, address));
         }
 
         streamer.fetch();
@@ -83,23 +83,49 @@ public class BootStrapper
     }
 
     /**
-     * if initialtoken was specified, use that.
-     * otherwise, pick a token to assume half the load of the most-loaded node.
+     * if initialtoken was specified, use that (split on comma).
+     * otherwise, if num_tokens == 1, pick a token to assume half the load of the most-loaded node.
+     * else choose num_tokens tokens at random
      */
-    public static Token getBootstrapToken(final TokenMetadata metadata, final Map<InetAddress, Double> load) throws IOException, ConfigurationException
+    public static Collection<Token> getBootstrapTokens(final TokenMetadata metadata, Map<InetAddress, Double> load) throws IOException, ConfigurationException
     {
-        if (DatabaseDescriptor.getInitialToken() != null)
+        Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
+        if (initialTokens.size() > 0)
         {
-            logger.debug("token manually specified as " + DatabaseDescriptor.getInitialToken());
-            Token token = StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getInitialToken());
-            if (metadata.getEndpoint(token) != null)
-                throw new ConfigurationException("Bootstraping to existing token " + token + " is not allowed (decommission/removetoken the old node first).");
-            return token;
+            logger.debug("tokens manually specified as {}",  initialTokens);
+            List<Token> tokens = new ArrayList<Token>();
+            for (String tokenString : initialTokens)
+            {
+                Token token = StorageService.getPartitioner().getTokenFactory().fromString(tokenString);
+                if (metadata.getEndpoint(token) != null)
+                    throw new ConfigurationException("Bootstraping to existing token " + tokenString + " is not allowed (decommission/removetoken the old node first).");
+                tokens.add(token);
+            }
+            return tokens;
         }
 
-        return getBalancedToken(metadata, load);
+        int numTokens = DatabaseDescriptor.getNumTokens();
+        if (numTokens < 1)
+            throw new ConfigurationException("num_tokens must be >= 1");
+        if (numTokens == 1)
+            return Collections.singleton(getBalancedToken(metadata, load));
+
+        return getRandomTokens(metadata, numTokens);
+    }
+
+    public static Collection<Token> getRandomTokens(TokenMetadata metadata, int numTokens)
+    {
+        Set<Token> tokens = new HashSet<Token>(numTokens);
+        while (tokens.size() < numTokens)
+        {
+            Token token = StorageService.getPartitioner().getRandomToken();
+            if (metadata.getEndpoint(token) == null)
+                tokens.add(token);
+        }
+        return tokens;
     }
 
+    @Deprecated
     public static Token getBalancedToken(TokenMetadata metadata, Map<InetAddress, Double> load)
     {
         InetAddress maxEndpoint = getBootstrapSource(metadata, load);
@@ -108,6 +134,7 @@ public class BootStrapper
         return t;
     }
 
+    @Deprecated
     static InetAddress getBootstrapSource(final TokenMetadata metadata, final Map<InetAddress, Double> load)
     {
         // sort first by number of nodes already bootstrapping into a source node's range, then by load.
@@ -151,6 +178,7 @@ public class BootStrapper
         return maxEndpoint;
     }
 
+    @Deprecated
     static Token<?> getBootstrapTokenFrom(InetAddress maxEndpoint)
     {
         MessageOut message = new MessageOut(MessagingService.Verb.BOOTSTRAP_TOKEN);
@@ -170,6 +198,7 @@ public class BootStrapper
         throw new RuntimeException("Bootstrap failed, could not obtain token from: " + maxEndpoint);
     }
 
+    @Deprecated
     public static class BootstrapTokenVerbHandler implements IVerbHandler
     {
         public void doVerb(MessageIn message, String id)
@@ -181,6 +210,7 @@ public class BootStrapper
         }
     }
 
+    @Deprecated
     private static class BootstrapTokenCallback implements IAsyncCallback<String>
     {
         private volatile Token<?> token;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/gms/Gossiper.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java
index 4208c07..8d55dbf 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -412,7 +412,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
     {
         InetAddress endpoint = InetAddress.getByName(address);
         EndpointState epState = endpointStateMap.get(endpoint);
-        Token token = null;
+        Collection<Token> tokens = null;
         logger.warn("Assassinating {} via gossip", endpoint);
         if (epState == null)
         {
@@ -422,7 +422,7 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
         {
             try
             {
-                token = StorageService.instance.getTokenMetadata().getToken(endpoint);
+                tokens = StorageService.instance.getTokenMetadata().getTokens(endpoint);
             }
             catch (AssertionError e)
             {
@@ -444,10 +444,10 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean
             epState.updateTimestamp(); // make sure we don't evict it too soon
             epState.getHeartBeatState().forceNewerGenerationUnsafe();
         }
-        if (token == null)
-            token = StorageService.instance.getBootstrapToken();
+        if (tokens == null)
+            tokens = Arrays.asList(StorageService.instance.getBootstrapToken());
         // do not pass go, do not collect 200 dollars, just gtfo
-        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(token, computeExpireTime()));
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.left(tokens, computeExpireTime()));
         handleMajorStateChange(endpoint, epState);
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/gms/VersionedValue.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java b/src/java/org/apache/cassandra/gms/VersionedValue.java
index ff41fcb..a5dc9cd 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -19,6 +19,10 @@ package org.apache.cassandra.gms;
 
 import java.io.*;
 import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.cassandra.db.TypeSizes;
@@ -107,18 +111,26 @@ public class VersionedValue implements Comparable<VersionedValue>
             this.partitioner = partitioner;
         }
 
-        public VersionedValue bootstrapping(Token token, UUID hostId)
+        public VersionedValue bootstrapping(Collection<Token> tokens, UUID hostId)
         {
             return new VersionedValue(versionString(VersionedValue.STATUS_BOOTSTRAPPING,
                                                     hostId.toString(),
-                                                    partitioner.getTokenFactory().toString(token)));
+                                                    makeTokenString(tokens)));
         }
 
-        public VersionedValue normal(Token token, UUID hostId)
+        public VersionedValue normal(Collection<Token> tokens, UUID hostId)
         {
             return new VersionedValue(versionString(VersionedValue.STATUS_NORMAL,
                                                     hostId.toString(),
-                                                    partitioner.getTokenFactory().toString(token)));
+                                                    makeTokenString(tokens)));
+        }
+
+        private String makeTokenString(Collection<Token> tokens)
+        {
+            List<String> tokenStrings = new ArrayList<String>();
+            for (Token<?> token : tokens)
+                tokenStrings.add(partitioner.getTokenFactory().toString(token));
+            return StringUtils.join(tokenStrings, VersionedValue.DELIMITER);
         }
 
         public VersionedValue load(double load)
@@ -131,15 +143,17 @@ public class VersionedValue implements Comparable<VersionedValue>
             return new VersionedValue(newVersion.toString());
         }
 
-        public VersionedValue leaving(Token token)
+        public VersionedValue leaving(Collection<Token> tokens)
         {
-            return new VersionedValue(VersionedValue.STATUS_LEAVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new VersionedValue(versionString(VersionedValue.STATUS_LEAVING,
+                                                    makeTokenString(tokens)));
         }
 
-        public VersionedValue left(Token token, long expireTime)
+        public VersionedValue left(Collection<Token> tokens, long expireTime)
         {
-            return new VersionedValue(VersionedValue.STATUS_LEFT + VersionedValue.DELIMITER
-                    + partitioner.getTokenFactory().toString(token) + VersionedValue.DELIMITER + expireTime);
+            return new VersionedValue(versionString(VersionedValue.STATUS_LEFT,
+                                                    Long.toString(expireTime),
+                                                    makeTokenString(tokens)));
         }
 
         public VersionedValue moving(Token token)
@@ -226,6 +240,12 @@ public class VersionedValue implements Comparable<VersionedValue>
                     outValue = versionString(pieces[0], pieces[2]);
                 }
 
+                if (type.equals(STATUS_LEFT))
+                {
+                    assert pieces.length >= 3;
+                    outValue = versionString(pieces[0], pieces[2], pieces[1]);
+                }
+
                 if ((type.equals(REMOVAL_COORDINATOR)) || (type.equals(REMOVING_TOKEN)) || (type.equals(REMOVED_TOKEN)))
                     throw new RuntimeException(String.format("Unable to serialize %s(%s...) for nodes older than 1.2",
                                                              VersionedValue.class.getName(), type));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index 7fa431a..66fb849 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -181,8 +181,13 @@ public abstract class AbstractReplicationStrategy
 
     public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddress pendingAddress)
     {
+        return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress);
+    }
+
+    public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddress pendingAddress)
+    {
         TokenMetadata temp = metadata.cloneOnlyTokenMap();
-        temp.updateNormalToken(pendingToken, pendingAddress);
+        temp.updateNormalTokens(pendingTokens, pendingAddress);
         return getAddressRanges(temp).get(pendingAddress);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/locator/TokenMetadata.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index 8fb63d5..7ad962b 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -296,14 +296,15 @@ public class TokenMetadata
         }
     }
 
-    public void removeBootstrapToken(Token token)
+    public void removeBootstrapTokens(Collection<Token> tokens)
     {
-        assert token != null;
+        assert tokens != null && !tokens.isEmpty();
 
         lock.writeLock().lock();
         try
         {
-            bootstrapTokens.remove(token);
+            for (Token token : tokens)
+                bootstrapTokens.remove(token);
         }
         finally
         {
@@ -847,6 +848,23 @@ public class TokenMetadata
         return endpoints;
     }
 
+    /** @return an endpoint to token multimap representation of tokenToEndpointMap (a copy) */
+    public Multimap<InetAddress, Token> getEndpointToTokenMapForReading()
+    {
+        lock.readLock().lock();
+        try
+        {
+            Multimap<InetAddress, Token> cloned = HashMultimap.<InetAddress, Token>create();
+            for (Map.Entry<Token, InetAddress> entry : tokenToEndpointMap.entrySet())
+                cloned.put(entry.getValue(), entry.getKey());
+            return cloned;
+        }
+        finally
+        {
+            lock.readLock().unlock();
+        }
+    }
+
     /**
      * @return a (stable copy, won't be modified) Token to Endpoint map for all the normal and bootstrapping nodes
      *         in the cluster.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 583b3a7..a58653d 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -169,14 +169,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     }
 
     /** This method updates the local token on disk  */
-    public void setToken(Token token)
+    public void setTokens(Collection<Token> tokens)
     {
         if (logger.isDebugEnabled())
-            logger.debug("Setting token to {}", token);
-        SystemTable.updateToken(token);
-        tokenMetadata.updateNormalToken(token, FBUtilities.getBroadcastAddress());
+            logger.debug("Setting tokens to {}", tokens);
+        SystemTable.updateTokens(tokens);
+        tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
-                                                   valueFactory.normal(getLocalToken(), SystemTable.getLocalHostId()));
+                                                   valueFactory.normal(getLocalTokens(), SystemTable.getLocalHostId()));
         setMode(Mode.NORMAL, false);
     }
 
@@ -402,17 +402,18 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         if (Boolean.parseBoolean(System.getProperty("cassandra.load_ring_state", "true")))
         {
             logger.info("Loading persisted ring state");
-            for (Map.Entry<Token, InetAddress> entry : SystemTable.loadTokens().entrySet())
+            Multimap<InetAddress, Token> loadedTokens = SystemTable.loadTokens();
+            for (InetAddress ep : loadedTokens.keySet())
             {
-                if (entry.getValue() == FBUtilities.getLocalAddress())
+                if (ep.equals(FBUtilities.getBroadcastAddress()))
                 {
                     // entry has been mistakenly added, delete it
-                    SystemTable.removeToken(entry.getKey());
+                    SystemTable.removeTokens(loadedTokens.get(ep));
                 }
                 else
                 {
-                    tokenMetadata.updateNormalToken(entry.getKey(), entry.getValue());
-                    Gossiper.instance.addSavedEndpoint(entry.getValue());
+                    tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep);
+                    Gossiper.instance.addSavedEndpoint(ep);
                 }
             }
         }
@@ -500,7 +501,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         Schema.instance.updateVersionAndAnnounce();
         // add rpc listening info
         Gossiper.instance.addLocalApplicationState(ApplicationState.RPC_ADDRESS, valueFactory.rpcaddress(DatabaseDescriptor.getRpcAddress()));
-        if (null != DatabaseDescriptor.getReplaceToken())
+        if (0 != DatabaseDescriptor.getReplaceTokens().size())
             Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.hibernate(true));
 
         MessagingService.instance().listen(FBUtilities.getLocalAddress());
@@ -515,9 +516,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                 && !SystemTable.isBootstrapped())
             logger.info("This node will not auto bootstrap because it is configured to be a seed node.");
 
-        InetAddress current = null;
+        Set<InetAddress> current = new HashSet<InetAddress>();
         // first startup is only chance to bootstrap
-        Token<?> token;
+        Collection<Token> tokens;
         if (DatabaseDescriptor.isAutoBootstrap()
             && !(SystemTable.isBootstrapped()
                  || DatabaseDescriptor.getSeeds().contains(FBUtilities.getBroadcastAddress())
@@ -552,7 +553,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             if (logger.isDebugEnabled())
                 logger.debug("... got ring + schema info");
 
-            if (DatabaseDescriptor.getReplaceToken() == null)
+            if (DatabaseDescriptor.getReplaceTokens().size() == 0)
             {
                 if (tokenMetadata.isMember(FBUtilities.getBroadcastAddress()))
                 {
@@ -560,7 +561,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                     throw new UnsupportedOperationException(s);
                 }
                 setMode(Mode.JOINING, "getting bootstrap token", true);
-                token = BootStrapper.getBootstrapToken(tokenMetadata, LoadBroadcaster.instance.getLoadInfo());
+                tokens = BootStrapper.getBootstrapTokens(tokenMetadata, LoadBroadcaster.instance.getLoadInfo());
             }
             else
             {
@@ -574,37 +575,53 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                 {
                     throw new AssertionError(e);
                 }
-                token = StorageService.getPartitioner().getTokenFactory().fromString(DatabaseDescriptor.getReplaceToken());
+                tokens = new ArrayList<Token>();
+                for (String token : DatabaseDescriptor.getReplaceTokens())
+                    tokens.add(StorageService.getPartitioner().getTokenFactory().fromString(token));
+
                 // check for operator errors...
-                current = tokenMetadata.getEndpoint(token);
-                if (null != current && Gossiper.instance.getEndpointStateForEndpoint(current).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
-                    throw new UnsupportedOperationException("Cannnot replace a token for a Live node... ");
-                setMode(Mode.JOINING, "Replacing a node with token: " + token, true);
+                for (Token token : tokens)
+                {
+                    InetAddress existing = tokenMetadata.getEndpoint(token);
+                    if (null != existing)
+                    {
+                        if (Gossiper.instance.getEndpointStateForEndpoint(existing).getUpdateTimestamp() > (System.currentTimeMillis() - delay))
+                            throw new UnsupportedOperationException("Cannnot replace a token for a Live node... ");
+                        current.add(existing);
+                    }
+                }
+
+                setMode(Mode.JOINING, "Replacing a node with token: " + tokens, true);
             }
 
-            bootstrap(token);
+            bootstrap(tokens);
             assert !isBootstrapMode; // bootstrap will block until finished
         }
         else
         {
-            token = SystemTable.getSavedToken();
-            if (token == null)
+            tokens = SystemTable.getSavedTokens();
+            if (tokens.isEmpty())
             {
-                String initialToken = DatabaseDescriptor.getInitialToken();
-                if (initialToken == null)
+                Collection<String> initialTokens = DatabaseDescriptor.getInitialTokens();
+                if (initialTokens.size() < 1)
                 {
-                    token = getPartitioner().getRandomToken();
-                    logger.warn("Generated random token " + token + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations");
+                    tokens = BootStrapper.getRandomTokens(tokenMetadata, DatabaseDescriptor.getNumTokens());
+                    if (DatabaseDescriptor.getNumTokens() == 1)
+                        logger.warn("Generated random token " + tokens + ". Random tokens will result in an unbalanced ring; see http://wiki.apache.org/cassandra/Operations");
+                    else
+                        logger.info("Generated random tokens.");
                 }
                 else
                 {
-                    token = getPartitioner().getTokenFactory().fromString(initialToken);
-                    logger.info("Saved token not found. Using " + token + " from configuration");
+                    tokens = new ArrayList<Token>();
+                    for (String token : initialTokens)
+                        tokens.add(getPartitioner().getTokenFactory().fromString(token));
+                    logger.info("Saved token not found. Using " + tokens + " from configuration");
                 }
             }
             else
             {
-                logger.info("Using saved token " + token);
+                logger.info("Using saved token " + tokens);
             }
         }
 
@@ -612,10 +629,11 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         {
             // start participating in the ring.
             SystemTable.setBootstrapped(true);
-            setToken(token);
+            setTokens(tokens);
             // remove the existing info about the replaced node.
-            if (current != null)
-                Gossiper.instance.replacedEndpoint(current);
+            if (!current.isEmpty())
+                for (InetAddress existing : current)
+                    Gossiper.instance.replacedEndpoint(existing);
             logger.info("Bootstrap/Replace/Move completed! Now serving reads.");
             assert tokenMetadata.sortedTokens().size() > 0;
         }
@@ -634,7 +652,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
         else if (isSurveyMode)
         {
-            setToken(SystemTable.getSavedToken());
+            setTokens(SystemTable.getSavedTokens());
             SystemTable.setBootstrapped(true);
             isSurveyMode = false;
             logger.info("Leaving write survey mode and joining ring at operator request");
@@ -708,15 +726,15 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             logger.debug(logMsg);
     }
 
-    private void bootstrap(Token token) throws IOException
+    private void bootstrap(Collection<Token> tokens) throws IOException
     {
         isBootstrapMode = true;
-        SystemTable.updateToken(token); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
-        if (null == DatabaseDescriptor.getReplaceToken())
+        SystemTable.updateTokens(tokens); // DON'T use setToken, that makes us part of the ring locally which is incorrect until we are done bootstrapping
+        if (0 == DatabaseDescriptor.getReplaceTokens().size())
         {
             // if not an existing token then bootstrap
             Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS,
-                                                       valueFactory.bootstrapping(token, SystemTable.getLocalHostId()));
+                                                       valueFactory.bootstrapping(tokens, SystemTable.getLocalHostId()));
             setMode(Mode.JOINING, "sleeping " + RING_DELAY + " ms for pending range setup", true);
             try
             {
@@ -730,10 +748,10 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         else
         {
             // Dont set any state for the node which is bootstrapping the existing token...
-            tokenMetadata.updateNormalToken(token, FBUtilities.getBroadcastAddress());
+            tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
         }
         setMode(Mode.JOINING, "Starting to bootstrap...", true);
-        new BootStrapper(FBUtilities.getBroadcastAddress(), token, tokenMetadata).bootstrap(); // handles token update
+        new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(); // handles token update
     }
 
     public boolean isBootstrapMode()
@@ -1052,10 +1070,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
             else tokenPos = 1;
 
-        Token token = getPartitioner().getTokenFactory().fromString(pieces[tokenPos]);
+        Collection<Token> tokens = new ArrayList<Token>();
+        for (int i = tokenPos; i < pieces.length; ++i)
+            tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
 
         if (logger.isDebugEnabled())
-            logger.debug("Node " + endpoint + " state bootstrapping, token " + token);
+            logger.debug("Node " + endpoint + " state bootstrapping, token " + tokens);
 
         // if this node is present in token metadata, either we have missed intermediate states
         // or the node had crashed. Print warning if needed, clear obsolete stuff and
@@ -1072,7 +1092,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             tokenMetadata.removeEndpoint(endpoint);
         }
 
-        tokenMetadata.addBootstrapToken(token, endpoint);
+        tokenMetadata.addBootstrapTokens(tokens, endpoint);
         calculatePendingRanges();
 
         if (usesHostId(endpoint))
@@ -1103,10 +1123,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             tokensPos = 1;
         logger.debug("Using token position {} for {}", tokensPos, endpoint);
 
-        Token token = getPartitioner().getTokenFactory().fromString(pieces[tokensPos]);
+        Collection<Token> tokens = new ArrayList<Token>();
+        for (int i = tokensPos; i < pieces.length; ++i)
+            tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
 
         if (logger.isDebugEnabled())
-            logger.debug("Node " + endpoint + " state normal, token " + token);
+            logger.debug("Node " + endpoint + " state normal, token " + tokens);
 
         if (tokenMetadata.isMember(endpoint))
             logger.info("Node " + endpoint + " state jump to normal");
@@ -1115,36 +1137,61 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         if (usesHostId(endpoint))
             tokenMetadata.updateHostId(UUID.fromString(pieces[1]), endpoint);
 
-        // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
-        InetAddress currentOwner = tokenMetadata.getEndpoint(token);
-        if (currentOwner == null)
-        {
-            logger.debug("New node " + endpoint + " at token " + token);
-            tokenMetadata.updateNormalToken(token, endpoint);
-            if (!isClientMode)
-                SystemTable.updateToken(endpoint, token);
-        }
-        else if (endpoint.equals(currentOwner))
-        {
-            // set state back to normal, since the node may have tried to leave, but failed and is now back up
-            // no need to persist, token/ip did not change
-            tokenMetadata.updateNormalToken(token, endpoint);
-        }
-        else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
-        {
-            logger.info(String.format("Nodes %s and %s have the same token %s.  %s is the new owner",
-                                       endpoint, currentOwner, token, endpoint));
-            tokenMetadata.updateNormalToken(token, endpoint);
-            Gossiper.instance.removeEndpoint(currentOwner);
-            if (!isClientMode)
-                SystemTable.updateToken(endpoint, token);
-        }
-        else
+        Set<Token> tokensToUpdateInMetadata = new HashSet<Token>();
+        Set<Token> tokensToUpdateInSystemTable = new HashSet<Token>();
+        Set<InetAddress> endpointsToRemove = new HashSet<InetAddress>();
+        Multimap<InetAddress, Token> epToTokenCopy = getTokenMetadata().getEndpointToTokenMapForReading();
+
+        for (Token token : tokens)
         {
-            logger.info(String.format("Nodes %s and %s have the same token %s.  Ignoring %s",
-                                       endpoint, currentOwner, token, endpoint));
+            // we don't want to update if this node is responsible for the token and it has a later startup time than endpoint.
+            InetAddress currentOwner = tokenMetadata.getEndpoint(token);
+            if (currentOwner == null)
+            {
+                logger.debug("New node " + endpoint + " at token " + token);
+                tokensToUpdateInMetadata.add(token);
+                if (!isClientMode)
+                    tokensToUpdateInSystemTable.add(token);
+            }
+            else if (endpoint.equals(currentOwner))
+            {
+                // set state back to normal, since the node may have tried to leave, but failed and is now back up
+                // no need to persist, token/ip did not change
+                tokensToUpdateInMetadata.add(token);
+            }
+            else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
+            {
+                tokensToUpdateInMetadata.add(token);
+                if (!isClientMode)
+                    tokensToUpdateInSystemTable.add(token);
+
+                // currentOwner is no longer current, endpoint is.  Keep track of these moves, because when
+                // a host no longer has any tokens, we'll want to remove it.
+                epToTokenCopy.get(currentOwner).remove(token);
+                if (epToTokenCopy.get(currentOwner).size() < 1)
+                    endpointsToRemove.add(currentOwner);
+
+                logger.info(String.format("Nodes %s and %s have the same token %s.  %s is the new owner",
+                                          endpoint,
+                                          currentOwner,
+                                          token,
+                                          endpoint));
+            }
+            else
+            {
+                logger.info(String.format("Nodes %s and %s have the same token %s.  Ignoring %s",
+                                           endpoint,
+                                           currentOwner,
+                                           token,
+                                           endpoint));
+            }
         }
 
+        tokenMetadata.updateNormalTokens(tokensToUpdateInMetadata, endpoint);
+        for (InetAddress ep : endpointsToRemove)
+            Gossiper.instance.removeEndpoint(ep);
+        SystemTable.updateTokens(endpoint, tokensToUpdateInSystemTable);
+
         if (tokenMetadata.isMoving(endpoint)) // if endpoint was moving to a new token
             tokenMetadata.removeFromMoving(endpoint);
 
@@ -1160,11 +1207,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     private void handleStateLeaving(InetAddress endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
-        String moveValue = pieces[1];
-        Token token = getPartitioner().getTokenFactory().fromString(moveValue);
+        Collection<Token> tokens = new ArrayList<Token>();
+        for (int i = 1; i < pieces.length; ++i)
+            tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
 
         if (logger.isDebugEnabled())
-            logger.debug("Node " + endpoint + " state leaving, token " + token);
+            logger.debug("Node " + endpoint + " state leaving, tokens " + tokens);
 
         // If the node is previously unknown or tokens do not match, update tokenmetadata to
         // have this node as 'normal' (it must have been using this token before the
@@ -1172,12 +1220,12 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         if (!tokenMetadata.isMember(endpoint))
         {
             logger.info("Node " + endpoint + " state jump to leaving");
-            tokenMetadata.updateNormalToken(token, endpoint);
+            tokenMetadata.updateNormalTokens(tokens, endpoint);
         }
-        else if (!tokenMetadata.getTokens(endpoint).contains(token))
+        else if (!tokenMetadata.getTokens(endpoint).containsAll(tokens))
         {
             logger.warn("Node " + endpoint + " 'leaving' token mismatch. Long network partition?");
-            tokenMetadata.updateNormalToken(token, endpoint);
+            tokenMetadata.updateNormalTokens(tokens, endpoint);
         }
 
         // at this point the endpoint is certainly a member with this token, so let's proceed
@@ -1195,12 +1243,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
     private void handleStateLeft(InetAddress endpoint, String[] pieces)
     {
         assert pieces.length >= 2;
-        Token token = getPartitioner().getTokenFactory().fromString(pieces[1]);
+        Collection<Token> tokens = null;
+        Integer version = MessagingService.instance().getVersion(endpoint);
+        if (version < MessagingService.VERSION_12)
+            tokens = Arrays.asList(getPartitioner().getTokenFactory().fromString(pieces[1]));
+        else
+        {
+            tokens = new ArrayList<Token>(pieces.length - 2);
+            for (int i = 2; i < pieces.length; ++i)
+                tokens.add(getPartitioner().getTokenFactory().fromString(pieces[i]));
+        }
 
         if (logger.isDebugEnabled())
-            logger.debug("Node " + endpoint + " state left, token " + token);
+            logger.debug("Node " + endpoint + " state left, tokens " + tokens);
 
-        excise(token, endpoint, extractExpireTime(pieces));
+        excise(tokens, endpoint, extractExpireTime(pieces, version));
     }
 
     /**
@@ -1252,7 +1309,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
 
             if (VersionedValue.REMOVED_TOKEN.equals(state))
             {
-                excise(removeToken, endpoint, extractExpireTime(pieces));
+                excise(Collections.singleton(removeToken),
+                       endpoint,
+                       extractExpireTime(pieces, MessagingService.instance().getVersion(endpoint)));
             }
             else if (VersionedValue.REMOVING_TOKEN.equals(state))
             {
@@ -1272,24 +1331,24 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         } // not a member, nothing to do
     }
 
-    private void excise(Token token, InetAddress endpoint)
+    private void excise(Collection<Token> tokens, InetAddress endpoint)
     {
         HintedHandOffManager.instance.deleteHintsForEndpoint(endpoint);
         Gossiper.instance.removeEndpoint(endpoint);
         tokenMetadata.removeEndpoint(endpoint);
-        tokenMetadata.removeBootstrapToken(token);
+        tokenMetadata.removeBootstrapTokens(tokens);
         calculatePendingRanges();
         if (!isClientMode)
         {
-            logger.info("Removing token " + token + " for " + endpoint);
-            SystemTable.removeToken(token);
+            logger.info("Removing tokens " + tokens + " for " + endpoint);
+            SystemTable.removeTokens(tokens);
         }
     }
 
-    private void excise(Token token, InetAddress endpoint, long expireTime)
+    private void excise(Collection<Token> tokens, InetAddress endpoint, long expireTime)
     {
         addExpireTimeIfFound(endpoint, expireTime);
-        excise(token, endpoint);
+        excise(tokens, endpoint);
     }
 
     protected void addExpireTimeIfFound(InetAddress endpoint, long expireTime)
@@ -1300,14 +1359,21 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
     }
 
-    protected long extractExpireTime(String[] pieces)
+    protected long extractExpireTime(String[] pieces, int version)
     {
-        long expireTime = 0L;
-        if (pieces.length >= 3)
+        if (version < MessagingService.VERSION_12)
         {
-            expireTime = Long.parseLong(pieces[2]);
+            if (pieces.length >= 3)
+                return Long.parseLong(pieces[2]);
+            else
+                return 0L;
+        } else
+        {
+            if (VersionedValue.STATUS_LEFT.equals(pieces[0]))
+                return Long.parseLong(pieces[1]);
+            else
+                return Long.parseLong(pieces[2]);
         }
-        return expireTime;
     }
 
     /**
@@ -1653,18 +1719,18 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         HintedHandOffManager.instance.scheduleHintDelivery(host);
     }
 
-    public Token getLocalToken()
+    public Collection<Token> getLocalTokens()
     {
-        Token token = SystemTable.getSavedToken();
-        assert token != null; // should not be called before initServer sets this
-        return token;
+        Collection<Token> tokens = SystemTable.getSavedTokens();
+        assert tokens != null && !tokens.isEmpty(); // should not be called before initServer sets this
+        return tokens;
     }
 
     /* These methods belong to the MBean interface */
 
     public String getToken()
     {
-        return getLocalToken().toString();
+        return getLocalTokens().iterator().next().toString();
     }
 
     public String getReleaseVersion()
@@ -2255,7 +2321,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
      */
     private void startLeaving()
     {
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalToken()));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.leaving(getLocalTokens()));
         tokenMetadata.addLeavingEndpoint(FBUtilities.getBroadcastAddress());
         calculatePendingRanges();
     }
@@ -2299,7 +2365,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         tokenMetadata.removeEndpoint(FBUtilities.getBroadcastAddress());
         calculatePendingRanges();
 
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalToken(),Gossiper.computeExpireTime()));
+        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.left(getLocalTokens(),Gossiper.computeExpireTime()));
         int delay = Math.max(RING_DELAY, Gossiper.intervalInMillis * 2);
         logger.info("Announcing that I have left the ring for " + delay + "ms");
         try
@@ -2378,7 +2444,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
         }
 
         Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.moving(newToken));
-        setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalToken(), newToken), true);
+        setMode(Mode.MOVING, String.format("Moving %s from %s to %s.", localAddress, getLocalTokens().iterator().next(), newToken), true);
 
         IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch();
 
@@ -2481,10 +2547,10 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             }
         }
 
-        setToken(newToken); // setting new token as we have everything settled
+        setTokens(Collections.singleton(newToken)); // setting new token as we have everything settled
 
         if (logger.isDebugEnabled())
-            logger.debug("Successfully moved to new token {}", getLocalToken());
+            logger.debug("Successfully moved to new token {}", getLocalTokens().iterator().next());
     }
 
     /**
@@ -2515,7 +2581,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
                 UUID hostId = tokenMetadata.getHostId(endpoint);
                 Gossiper.instance.advertiseTokenRemoved(endpoint, hostId);
                 Token token = tokenMetadata.getToken(endpoint);
-                excise(token, endpoint);
+                excise(Collections.singleton(token), endpoint);
             }
             replicatingNodes.clear();
             removingNode = null;
@@ -2603,7 +2669,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
             }
         }
 
-        excise(token, endpoint);
+        excise(Collections.singleton(token), endpoint);
 
         // gossiper will indicate the token has left
         Gossiper.instance.advertiseTokenRemoved(endpoint, hostId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/Util.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java
index 7f8c227..d783413 100644
--- a/test/unit/org/apache/cassandra/Util.java
+++ b/test/unit/org/apache/cassandra/Util.java
@@ -26,6 +26,7 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.Callable;
@@ -230,7 +231,9 @@ public class Util
         {
             InetAddress ep = InetAddress.getByName("127.0.0." + String.valueOf(i + 1));
             Gossiper.instance.initializeNodeUnsafe(ep, 1);
-            ss.onChange(ep, ApplicationState.STATUS, new VersionedValue.VersionedValueFactory(partitioner).normal(endpointTokens.get(i), hostIds.get(i)));
+            ss.onChange(ep,
+                        ApplicationState.STATUS,
+                        new VersionedValue.VersionedValueFactory(partitioner).normal(Collections.singleton(endpointTokens.get(i)), hostIds.get(i)));
             hosts.add(ep);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/db/SystemTableTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SystemTableTest.java b/test/unit/org/apache/cassandra/db/SystemTableTest.java
index 0c7b8a3..7c7eb24 100644
--- a/test/unit/org/apache/cassandra/db/SystemTableTest.java
+++ b/test/unit/org/apache/cassandra/db/SystemTableTest.java
@@ -23,24 +23,36 @@ package org.apache.cassandra.db;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.UUID;
 
-import com.google.common.base.Charsets;
 import org.junit.Test;
 
 import org.apache.cassandra.dht.BytesToken;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.utils.ByteBufferUtil;
 
 public class SystemTableTest
 {
     @Test
-    public void testLocalToken()
+    public void testLocalTokens()
     {
-        SystemTable.updateToken(new BytesToken(ByteBufferUtil.bytes("token")));
-        assert new String(((BytesToken) SystemTable.getSavedToken()).token, Charsets.UTF_8).equals("token");
+        // Remove all existing tokens
+        SystemTable.updateTokens(Collections.<Token> emptySet());
 
-        SystemTable.updateToken(new BytesToken(ByteBufferUtil.bytes("token2")));
-        assert new String(((BytesToken) SystemTable.getSavedToken()).token, Charsets.UTF_8).equals("token2");
+        List<Token> tokens = new ArrayList<Token>()
+        {{
+            for (int i = 0; i < 9; i++)
+                add(new BytesToken(ByteBufferUtil.bytes(String.format("token%d", i))));
+        }};
+
+        SystemTable.updateTokens(tokens);
+        int count = 0;
+
+        for (Token tok : SystemTable.getSavedTokens())
+            assert tokens.get(count++).equals(tok);
     }
 
     @Test
@@ -48,10 +60,10 @@ public class SystemTableTest
     {
         BytesToken token = new BytesToken(ByteBufferUtil.bytes("token3"));
         InetAddress address = InetAddress.getByName("127.0.0.2");
-        SystemTable.updateToken(address, token);
-        assert SystemTable.loadTokens().get(token).equals(address);
-        SystemTable.removeToken(token);
-        assert !SystemTable.loadTokens().containsKey(token);
+        SystemTable.updateTokens(address, Collections.<Token>singletonList(token));
+        assert SystemTable.loadTokens().get(address).contains(token);
+        SystemTable.removeTokens(Collections.<Token>singletonList(token));
+        assert !SystemTable.loadTokens().containsValue(token);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index 3c8cd3b..997c5e4 100644
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.HashMap;
 import java.util.Set;
@@ -105,7 +106,7 @@ public class BootStrapperTest extends SchemaLoader
             assert range.contains(token);
             ss.onChange(bootstrapAddrs[i],
                         ApplicationState.STATUS,
-                        StorageService.instance.valueFactory.bootstrapping(token, bootstrapHostIds[i]));
+                        StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(token), bootstrapHostIds[i]));
         }
 
         // any further attempt to bootsrtap should fail since every node in the cluster is splitting.
@@ -124,7 +125,7 @@ public class BootStrapperTest extends SchemaLoader
         Token token = StorageService.getPartitioner().midpoint(range.left, range.right);
         ss.onChange(bootstrapAddrs[2],
                     ApplicationState.STATUS,
-                    StorageService.instance.valueFactory.normal(token, bootstrapHostIds[2]));
+                    StorageService.instance.valueFactory.normal(Collections.singleton(token), bootstrapHostIds[2]));
         load.put(bootstrapAddrs[2], 0d);
         InetAddress addr = BootStrapper.getBootstrapSource(ss.getTokenMetadata(), load);
         assert addr != null && addr.equals(addrs[2]);
@@ -158,7 +159,7 @@ public class BootStrapperTest extends SchemaLoader
         assert range5.contains(fakeToken);
         ss.onChange(myEndpoint,
                     ApplicationState.STATUS,
-                    StorageService.instance.valueFactory.bootstrapping(fakeToken, UUID.randomUUID()));
+                    StorageService.instance.valueFactory.bootstrapping(Collections.<Token>singleton(fakeToken), UUID.randomUUID()));
         tmd = ss.getTokenMetadata();
 
         InetAddress source4 = BootStrapper.getBootstrapSource(tmd, load);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/gms/SerializationsTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/gms/SerializationsTest.java b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
index afeaab3..4267d72 100644
--- a/test/unit/org/apache/cassandra/gms/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/gms/SerializationsTest.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.gms;
 
 import org.apache.cassandra.AbstractSerializationsTester;
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.FBUtilities;
 import org.junit.Test;
@@ -28,6 +29,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -112,7 +114,8 @@ public class SerializationsTest extends AbstractSerializationsTester
         private static EndpointState EndpointSt = new EndpointState(HeartbeatSt);
         private static VersionedValue.VersionedValueFactory vvFact = new VersionedValue.VersionedValueFactory(StorageService.getPartitioner());
         private static VersionedValue vv0 = vvFact.load(23d);
-        private static VersionedValue vv1 = vvFact.bootstrapping(StorageService.getPartitioner().getRandomToken(), UUID.randomUUID());
+        private static VersionedValue vv1 = vvFact.bootstrapping(Collections.<Token>singleton(StorageService.getPartitioner().getRandomToken()),
+                                                                 UUID.randomUUID());
         private static List<GossipDigest> Digests = new ArrayList<GossipDigest>();
 
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index c373fa8..68b34b4 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -94,7 +94,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
         aes = AntiEntropyService.instance;
         TokenMetadata tmd = StorageService.instance.getTokenMetadata();
         tmd.clearUnsafe();
-        StorageService.instance.setToken(StorageService.getPartitioner().getRandomToken());
+        StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
         tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE);
         assert tmd.isMember(REMOTE);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
index fc3d8a6..3b76c2c 100644
--- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
+++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java
@@ -104,7 +104,7 @@ public class LeaveAndBootstrapTest
         // Third node leaves
         ss.onChange(hosts.get(LEAVING_NODE),
                 ApplicationState.STATUS,
-                valueFactory.leaving(endpointTokens.get(LEAVING_NODE)));
+                valueFactory.leaving(Collections.singleton(endpointTokens.get(LEAVING_NODE))));
         assertTrue(tmd.isLeaving(hosts.get(LEAVING_NODE)));
 
         AbstractReplicationStrategy strategy;
@@ -158,16 +158,22 @@ public class LeaveAndBootstrapTest
         // nodes 6, 8 and 9 leave
         final int[] LEAVING = new int[] {6, 8, 9};
         for (int leaving : LEAVING)
-            ss.onChange(hosts.get(leaving), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(leaving)));
+            ss.onChange(hosts.get(leaving),
+                        ApplicationState.STATUS,
+                        valueFactory.leaving(Collections.singleton(endpointTokens.get(leaving))));
 
         // boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
         InetAddress boot1 = InetAddress.getByName("127.0.1.1");
         Gossiper.instance.initializeNodeUnsafe(boot1, 1);
         UUID boot1Id = UUID.randomUUID();
-        ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5), boot1Id));
+        ss.onChange(boot1,
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), boot1Id));
         InetAddress boot2 = InetAddress.getByName("127.0.1.2");
         Gossiper.instance.initializeNodeUnsafe(boot2, 1);
-        ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID()));
+        ss.onChange(boot2,
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID()));
 
         Collection<InetAddress> endpoints = null;
 
@@ -320,10 +326,10 @@ public class LeaveAndBootstrapTest
         // Now finish node 6 and node 9 leaving, as well as boot1 (after this node 8 is still
         // leaving and boot2 in progress
         ss.onChange(hosts.get(LEAVING[0]), ApplicationState.STATUS,
-                valueFactory.left(endpointTokens.get(LEAVING[0]), Gossiper.computeExpireTime()));
+                valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[0])), Gossiper.computeExpireTime()));
         ss.onChange(hosts.get(LEAVING[2]), ApplicationState.STATUS,
-                valueFactory.left(endpointTokens.get(LEAVING[2]), Gossiper.computeExpireTime()));
-        ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(keyTokens.get(5), boot1Id));
+                valueFactory.left(Collections.singleton(endpointTokens.get(LEAVING[2])), Gossiper.computeExpireTime()));
+        ss.onChange(boot1, ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(5)), boot1Id));
 
         // adjust precalcuated results.  this changes what the epected endpoints are.
         expectedEndpoints.get("Keyspace1").get(new BigIntegerToken("55")).removeAll(makeAddrs("127.0.0.7", "127.0.0.8"));
@@ -445,7 +451,9 @@ public class LeaveAndBootstrapTest
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 7);
 
         // node 2 leaves
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(2)));
+        ss.onChange(hosts.get(2),
+                    ApplicationState.STATUS,
+                    valueFactory.leaving(Collections.singleton(endpointTokens.get(2))));
 
         // don't bother to test pending ranges here, that is extensively tested by other
         // tests. Just check that the node is in appropriate lists.
@@ -454,14 +462,18 @@ public class LeaveAndBootstrapTest
         assertTrue(tmd.getBootstrapTokens().isEmpty());
 
         // Bootstrap the node immedidiately to keyTokens.get(4) without going through STATE_LEFT
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(4), hostIds.get(2)));
+        ss.onChange(hosts.get(2),
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(4)), hostIds.get(2)));
 
         assertFalse(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
         assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(4)).equals(hosts.get(2)));
 
         // Bootstrap node hosts.get(3) to keyTokens.get(1)
-        ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3)));
+        ss.onChange(hosts.get(3),
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3)));
 
         assertFalse(tmd.isMember(hosts.get(3)));
         assertFalse(tmd.isLeaving(hosts.get(3)));
@@ -469,7 +481,9 @@ public class LeaveAndBootstrapTest
         assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
 
         // Bootstrap node hosts.get(2) further to keyTokens.get(3)
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(3), hostIds.get(2)));
+        ss.onChange(hosts.get(2),
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(3)), hostIds.get(2)));
 
         assertFalse(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -478,8 +492,10 @@ public class LeaveAndBootstrapTest
         assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(3)));
 
         // Go to normal again for both nodes
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(3), hostIds.get(2)));
-        ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2), hostIds.get(3)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(3)),
+                                                                               hostIds.get(2)));
+        ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)),
+                                                                               hostIds.get(3)));
 
         assertTrue(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -509,22 +525,24 @@ public class LeaveAndBootstrapTest
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6);
 
         // node 2 leaves
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(endpointTokens.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(endpointTokens.get(2))));
 
         assertTrue(tmd.isLeaving(hosts.get(2)));
         assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
 
         // back to normal
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(2), hostIds.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(2)),
+                                                                               hostIds.get(2)));
 
         assertTrue(tmd.getLeavingEndpoints().isEmpty());
         assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(2)));
 
         // node 3 goes through leave and left and then jumps to normal at its new token
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(2))));
         ss.onChange(hosts.get(2), ApplicationState.STATUS,
-                valueFactory.left(keyTokens.get(2), Gossiper.computeExpireTime()));
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(keyTokens.get(4), hostIds.get(2)));
+                valueFactory.left(Collections.singleton(keyTokens.get(2)), Gossiper.computeExpireTime()));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(keyTokens.get(4)),
+                                                                               hostIds.get(2)));
 
         assertTrue(tmd.getBootstrapTokens().isEmpty());
         assertTrue(tmd.getLeavingEndpoints().isEmpty());
@@ -549,21 +567,23 @@ public class LeaveAndBootstrapTest
         Util.createInitialRing(ss, partitioner, endpointTokens, keyTokens, hosts, hostIds, 6);
 
         // node 2 leaves with _different_ token
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(0)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(0))));
 
         assertTrue(tmd.getToken(hosts.get(2)).equals(keyTokens.get(0)));
         assertTrue(tmd.isLeaving(hosts.get(2)));
         assertTrue(tmd.getEndpoint(endpointTokens.get(2)) == null);
 
         // go to boostrap
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(2)));
+        ss.onChange(hosts.get(2),
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(2)));
 
         assertFalse(tmd.isLeaving(hosts.get(2)));
         assertTrue(tmd.getBootstrapTokens().size() == 1);
         assertTrue(tmd.getBootstrapTokens().get(keyTokens.get(1)).equals(hosts.get(2)));
 
         // jump to leaving again
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(keyTokens.get(1)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.leaving(Collections.singleton(keyTokens.get(1))));
 
         assertTrue(tmd.getEndpoint(keyTokens.get(1)).equals(hosts.get(2)));
         assertTrue(tmd.isLeaving(hosts.get(2)));
@@ -571,7 +591,7 @@ public class LeaveAndBootstrapTest
 
         // go to state left
         ss.onChange(hosts.get(2), ApplicationState.STATUS,
-                valueFactory.left(keyTokens.get(1), Gossiper.computeExpireTime()));
+                valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime()));
 
         assertFalse(tmd.isMember(hosts.get(2)));
         assertFalse(tmd.isLeaving(hosts.get(2)));
@@ -596,12 +616,12 @@ public class LeaveAndBootstrapTest
 
         // node hosts.get(2) goes jumps to left
         ss.onChange(hosts.get(2), ApplicationState.STATUS,
-                valueFactory.left(endpointTokens.get(2), Gossiper.computeExpireTime()));
+                valueFactory.left(Collections.singleton(endpointTokens.get(2)), Gossiper.computeExpireTime()));
 
         assertFalse(tmd.isMember(hosts.get(2)));
 
         // node hosts.get(4) goes to bootstrap
-        ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(1), hostIds.get(3)));
+        ss.onChange(hosts.get(3), ApplicationState.STATUS, valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(1)), hostIds.get(3)));
 
         assertFalse(tmd.isMember(hosts.get(3)));
         assertTrue(tmd.getBootstrapTokens().size() == 1);
@@ -609,7 +629,7 @@ public class LeaveAndBootstrapTest
 
         // and then directly to 'left'
         ss.onChange(hosts.get(2), ApplicationState.STATUS,
-                valueFactory.left(keyTokens.get(1), Gossiper.computeExpireTime()));
+                valueFactory.left(Collections.singleton(keyTokens.get(1)), Gossiper.computeExpireTime()));
 
         assertTrue(tmd.getBootstrapTokens().size() == 0);
         assertFalse(tmd.isMember(hosts.get(2)));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/66b96ee5/test/unit/org/apache/cassandra/service/MoveTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java
index 42f20fc..cf42564 100644
--- a/test/unit/org/apache/cassandra/service/MoveTest.java
+++ b/test/unit/org/apache/cassandra/service/MoveTest.java
@@ -136,7 +136,7 @@ public class MoveTest
         }
 
         // moving endpoint back to the normal state
-        ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(MOVING_NODE)));
+        ss.onChange(hosts.get(MOVING_NODE), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken), hostIds.get(MOVING_NODE)));
     }
 
     /*
@@ -182,10 +182,14 @@ public class MoveTest
         // boot two new nodes with keyTokens.get(5) and keyTokens.get(7)
         InetAddress boot1 = InetAddress.getByName("127.0.1.1");
         Gossiper.instance.initializeNodeUnsafe(boot1, 1);
-        ss.onChange(boot1, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(5), UUID.randomUUID()));
+        ss.onChange(boot1,
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(5)), UUID.randomUUID()));
         InetAddress boot2 = InetAddress.getByName("127.0.1.2");
         Gossiper.instance.initializeNodeUnsafe(boot2, 1);
-        ss.onChange(boot2, ApplicationState.STATUS, valueFactory.bootstrapping(keyTokens.get(7), UUID.randomUUID()));
+        ss.onChange(boot2,
+                    ApplicationState.STATUS,
+                    valueFactory.bootstrapping(Collections.<Token>singleton(keyTokens.get(7)), UUID.randomUUID()));
 
         // don't require test update every time a new keyspace is added to test/conf/cassandra.yaml
         Map<String, AbstractReplicationStrategy> tableStrategyMap = new HashMap<String, AbstractReplicationStrategy>();
@@ -471,7 +475,9 @@ public class MoveTest
         // all moving nodes are back to the normal state
         for (Integer movingIndex : MOVING)
         {
-            ss.onChange(hosts.get(movingIndex), ApplicationState.STATUS, valueFactory.normal(newTokens.get(movingIndex), hostIds.get(movingIndex)));
+            ss.onChange(hosts.get(movingIndex),
+                        ApplicationState.STATUS,
+                        valueFactory.normal(Collections.singleton(newTokens.get(movingIndex)), hostIds.get(movingIndex)));
         }
     }
 
@@ -500,7 +506,8 @@ public class MoveTest
         assertTrue(tmd.getToken(hosts.get(2)).equals(endpointTokens.get(2)));
 
         // back to normal
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken),
+                                                                               hostIds.get(2)));
 
         assertTrue(tmd.getMovingEndpoints().isEmpty());
         assertTrue(tmd.getToken(hosts.get(2)).equals(newToken));
@@ -508,7 +515,8 @@ public class MoveTest
         newToken = positionToken(8);
         // node 2 goes through leave and left and then jumps to normal at its new token
         ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.moving(newToken));
-        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(newToken, hostIds.get(2)));
+        ss.onChange(hosts.get(2), ApplicationState.STATUS, valueFactory.normal(Collections.singleton(newToken),
+                                                                               hostIds.get(2)));
 
         assertTrue(tmd.getBootstrapTokens().isEmpty());
         assertTrue(tmd.getMovingEndpoints().isEmpty());