You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/07/25 20:58:28 UTC

svn commit: r1150847 - in /cassandra/branches/cassandra-0.8: ./ src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/

Author: brandonwilliams
Date: Mon Jul 25 18:58:27 2011
New Revision: 1150847

URL: http://svn.apache.org/viewvc?rev=1150847&view=rev
Log:
Gossip handles dead states, token removal actually works, gossip states
are held for aVeryLongTime.
Patch by brandonwilliams and Paul Cannon, reviewed by Paul Cannon for
CASSANDRA-2496.

Modified:
    cassandra/branches/cassandra-0.8/CHANGES.txt
    cassandra/branches/cassandra-0.8/NEWS.txt
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
    cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java

Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jul 25 18:58:27 2011
@@ -1,6 +1,7 @@
 0.8.3
  * add ability to drop local reads/writes that are going to timeout
    (CASSANDRA-2943)
+ * revamp token removal process, keep gossip states for 3 days (CASSANDRA-2946)
 
 
 0.8.2

Modified: cassandra/branches/cassandra-0.8/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/NEWS.txt?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/NEWS.txt (original)
+++ cassandra/branches/cassandra-0.8/NEWS.txt Mon Jul 25 18:58:27 2011
@@ -1,3 +1,12 @@
+0.8.3
+=====
+
+Upgrading
+---------
+    - Token removal has been revamped.  Removing tokens in a mixed cluster with
+      0.8.3 will not work, so the entire cluster will need to be running 0.8.3
+      first, except for the dead node.
+
 0.8.2
 =====
 

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java Mon Jul 25 18:58:27 2011
@@ -29,6 +29,7 @@ public enum ApplicationState
     DC,
     RACK,
     RELEASE_VERSION,
+    REMOVAL_COORDINATOR,
     // pad to allow adding new states to existing cluster
     X1,
     X2,

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jul 25 18:58:27 2011
@@ -27,6 +27,7 @@ import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.*;
 
+import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.db.SystemTable;
 import org.apache.cassandra.net.MessageProducer;
 import org.apache.cassandra.config.ConfigurationException;
@@ -58,6 +59,8 @@ public class Gossiper implements IFailur
     private static final RetryingScheduledThreadPoolExecutor executor = new RetryingScheduledThreadPoolExecutor("GossipTasks");
 
     static final ApplicationState[] STATES = ApplicationState.values();
+    static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT);
+
     private ScheduledFuture<?> scheduledGossipTask;
     public final static int intervalInMillis = 1000;
     public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
@@ -264,17 +267,21 @@ public class Gossiper implements IFailur
     }
 
     /**
-     * Removes the endpoint from unreachable endpoint set
+     * Removes the endpoint from gossip completely
      *
      * @param endpoint endpoint to be removed from the current membership.
     */
     private void evictFromMembership(InetAddress endpoint)
     {
         unreachableEndpoints.remove(endpoint);
+        endpointStateMap.remove(endpoint);
+        justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+        if (logger.isDebugEnabled())
+            logger.debug("evicting " + endpoint + " from gossip");
     }
 
     /**
-     * Removes the endpoint completely from Gossip
+     * Removes the endpoint from Gossip but retains endpoint state
      */
     public void removeEndpoint(InetAddress endpoint)
     {
@@ -288,6 +295,8 @@ public class Gossiper implements IFailur
         FailureDetector.instance.remove(endpoint);
         versions.remove(endpoint);
         justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+        if (logger.isDebugEnabled())
+            logger.debug("removing endpoint " + endpoint);
     }
 
     /**
@@ -328,6 +337,67 @@ public class Gossiper implements IFailur
         }
     }
 
+    /**
+     * This method will begin removing an existing endpoint from the cluster by spoofing its state
+     * This should never be called unless this coordinator has had 'removetoken' invoked
+     *
+     * @param endpoint - the endpoint being removed
+     * @param token - the token being removed
+     * @param mytoken - my own token for replication coordination
+     */
+    public void advertiseRemoving(InetAddress endpoint, Token token, Token mytoken)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        // remember this node's generation
+        int generation = epState.getHeartBeatState().getGeneration();
+        logger.info("Removing token: " + token);
+        logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change");
+        try
+        {
+            Thread.sleep(StorageService.RING_DELAY);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+        // make sure it did not change
+        epState = endpointStateMap.get(endpoint);
+        if (epState.getHeartBeatState().getGeneration() != generation)
+            throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it");
+        // update the other node's generation to mimic it as if it had changed it itself
+        logger.info("Advertising removal for " + endpoint);
+        epState.updateTimestamp(); // make sure we don't evict it too soon
+        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(token));
+        epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(mytoken));
+        endpointStateMap.put(endpoint, epState);
+    }
+
+    /**
+     * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
+     * This should only be called after advertiseRemoving
+     * @param endpoint
+     * @param token
+     */
+    public void advertiseTokenRemoved(InetAddress endpoint, Token token)
+    {
+        EndpointState epState = endpointStateMap.get(endpoint);
+        epState.updateTimestamp(); // make sure we don't evict it too soon
+        epState.getHeartBeatState().forceNewerGenerationUnsafe();
+        epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token));
+        logger.info("Completing removal of " + endpoint);
+        endpointStateMap.put(endpoint, epState);
+        // ensure at least one gossip round occurs before returning
+        try
+        {
+            Thread.sleep(intervalInMillis * 2);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
+        }
+    }
+
     public boolean isKnownEndpoint(InetAddress endpoint)
     {
         return endpointStateMap.containsKey(endpoint);
@@ -456,23 +526,18 @@ public class Gossiper implements IFailur
             {
                 long duration = now - epState.getUpdateTimestamp();
 
+                if (StorageService.instance.getTokenMetadata().isMember(endpoint))
+                    epState.setHasToken(true);
                 // check if this is a fat client. fat clients are removed automatically from
                 // gosip after FatClientTimeout
-                if (!epState.hasToken() && !epState.isAlive() && (duration > FatClientTimeout))
+                if (!epState.hasToken() && !epState.isAlive() && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout))
                 {
-                    if (StorageService.instance.getTokenMetadata().isMember(endpoint))
-                        epState.setHasToken(true);
-                    else
-                    {
-                        if (!justRemovedEndpoints.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client
-                        {
-                            logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
-                            removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state
-                        }
-                    }
+                    logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
+                    removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay
+                    evictFromMembership(endpoint); // can get rid of the state immediately
                 }
 
-                if ( !epState.isAlive() && (duration > aVeryLongTime) )
+                if ( !epState.isAlive() && (duration > aVeryLongTime) && (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
                 {
                     evictFromMembership(endpoint);
                 }
@@ -488,7 +553,6 @@ public class Gossiper implements IFailur
                     if (logger.isDebugEnabled())
                         logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
                     justRemovedEndpoints.remove(entry.getKey());
-                    endpointStateMap.remove(entry.getKey());
                 }
             }
         }
@@ -585,6 +649,7 @@ public class Gossiper implements IFailur
             int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration();
             if ( remoteGeneration > localGeneration )
             {
+                localEndpointState.updateTimestamp();
                 fd.report(endpoint);
                 return;
             }
@@ -595,6 +660,7 @@ public class Gossiper implements IFailur
                 int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion();
                 if ( remoteVersion > localVersion )
                 {
+                    localEndpointState.updateTimestamp();
                     fd.report(endpoint);
                 }
             }
@@ -607,6 +673,7 @@ public class Gossiper implements IFailur
         if (logger.isTraceEnabled())
             logger.trace("marking as alive {}", addr);
         localState.markAlive();
+        localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime
         liveEndpoints.add(addr);
         unreachableEndpoints.remove(addr);
         logger.info("InetAddress {} is now UP", addr);
@@ -638,10 +705,13 @@ public class Gossiper implements IFailur
      */
     private void handleMajorStateChange(InetAddress ep, EndpointState epState)
     {
-        if (endpointStateMap.get(ep) != null)
-            logger.info("Node {} has restarted, now UP again", ep);
-        else
-            logger.info("Node {} is now part of the cluster", ep);
+        if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+        {
+            if (endpointStateMap.get(ep) != null)
+                logger.info("Node {} has restarted, now UP again", ep);
+            else
+                logger.info("Node {} is now part of the cluster", ep);
+        }
         if (logger.isTraceEnabled())
             logger.trace("Adding endpoint state for " + ep);
         endpointStateMap.put(ep, epState);
@@ -651,11 +721,31 @@ public class Gossiper implements IFailur
             for (IEndpointStateChangeSubscriber subscriber : subscribers)
                 subscriber.onDead(ep, epState);
         }
-        markAlive(ep, epState);
+        if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+            markAlive(ep, epState);
+        else
+        {
+            logger.debug("Not marking " + ep + " alive due to dead state");
+            epState.markDead();
+            epState.setHasToken(true); // fat clients won't have a dead state
+        }
         for (IEndpointStateChangeSubscriber subscriber : subscribers)
             subscriber.onJoin(ep, epState);
     }
 
+    private Boolean isDeadState(String value)
+    {
+        String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+        assert (pieces.length > 0);
+        String state = pieces[0];
+        for (String deadstate : DEAD_STATES)
+        {
+            if (state.equals(deadstate))
+                return true;
+        }
+        return false;
+    }
+
     void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
     {
         for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java Mon Jul 25 18:58:27 2011
@@ -71,6 +71,11 @@ class HeartBeatState
     {
         return version;
     }
+
+    void forceNewerGenerationUnsafe()
+    {
+        generation += 1;
+    }
 }
 
 class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java Mon Jul 25 18:58:27 2011
@@ -49,7 +49,7 @@ public class VersionedValue implements C
     public final static char DELIMITER = ',';
     public final static String DELIMITER_STR = new String(new char[] { DELIMITER });
 
-    // values for State.STATUS
+    // values for ApplicationState.STATUS
     public final static String STATUS_BOOTSTRAPPING = "BOOT";
     public final static String STATUS_NORMAL = "NORMAL";
     public final static String STATUS_LEAVING = "LEAVING";
@@ -59,6 +59,9 @@ public class VersionedValue implements C
     public final static String REMOVING_TOKEN = "removing";
     public final static String REMOVED_TOKEN = "removed";
 
+    // values for ApplicationState.REMOVAL_COORDINATOR
+    public final static String REMOVAL_COORDINATOR = "REMOVER";
+
     public final int version;
     public final String value;
 
@@ -129,20 +132,19 @@ public class VersionedValue implements C
             return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue removingNonlocal(Token localToken, Token token)
+        public VersionedValue removingNonlocal(Token token)
+        {
+            return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+        }
+
+        public VersionedValue removedNonlocal(Token token)
         {
-            return new VersionedValue(VersionedValue.STATUS_NORMAL
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
-                                        + VersionedValue.DELIMITER + VersionedValue.REMOVING_TOKEN
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
-        public VersionedValue removedNonlocal(Token localToken, Token token)
+        public VersionedValue removalCoordinator(Token token)
         {
-            return new VersionedValue(VersionedValue.STATUS_NORMAL
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
-                                        + VersionedValue.DELIMITER + VersionedValue.REMOVED_TOKEN
-                                        + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+            return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
         }
 
         public VersionedValue datacenter(String dcId)

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Mon Jul 25 18:58:27 2011
@@ -456,6 +456,10 @@ public final class MessagingService impl
 
     public void receive(Message message, String id)
     {
+        if (logger_.isTraceEnabled())
+            logger_.trace(FBUtilities.getLocalAddress() + " received " + message.getVerb()
+                          + " from " + id + "@" + message.getFrom());
+
         message = SinkManager.processServerMessage(message, id);
         if (message == null)
             return;

Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Mon Jul 25 18:58:27 2011
@@ -623,29 +623,35 @@ public class StorageService implements I
     }
 
     /*
-     * onChange only ever sees one ApplicationState piece change at a time, so we perform a kind of state machine here.
-     * We are concerned with two events: knowing the token associated with an endpoint, and knowing its operation mode.
-     * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change mode to normal.
-     * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode
-     * should instead be part of the token ring.
+     * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
+     * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
+     * from somewhere else.
+     *
+     * onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were
+     * received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing
+     * the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or
+     * normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have
+     * pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring.
      * 
-     * Normal MOVE_STATE progression of a node should be like this:
-     * STATE_BOOTSTRAPPING,token
+     * Normal progression of ApplicationState.STATUS values for a node should be like this:
+     * STATUS_BOOTSTRAPPING,token
      *   if bootstrapping. stays this way until all files are received.
-     * STATE_NORMAL,token 
+     * STATUS_NORMAL,token
      *   ready to serve reads and writes.
-     * STATE_NORMAL,token,REMOVE_TOKEN,token
-     *   specialized normal state in which this node acts as a proxy to tell the cluster about a dead node whose
-     *   token is being removed. this value becomes the permanent state of this node (unless it coordinates another
-     *   removetoken in the future).
-     * STATE_LEAVING,token 
-     *   get ready to leave the cluster as part of a decommission or move
-     * STATE_LEFT,token 
-     *   set after decommission or move is completed.
-     * STATE_MOVE,token
-     *   set if node if currently moving to a new token in the ring
-     * 
-     * Note: Any time a node state changes from STATE_NORMAL, it will not be visible to new nodes. So it follows that
+     * STATUS_LEAVING,token
+     *   get ready to leave the cluster as part of a decommission
+     * STATUS_LEFT,token
+     *   set after decommission is completed.
+     *
+     * Other STATUS values that may be seen (possibly anywhere in the normal progression):
+     * STATUS_MOVING,newtoken
+     *   set if node is currently moving to a new token in the ring
+     * REMOVING_TOKEN,deadtoken
+     *   set if the node is dead and is being removed by its REMOVAL_COORDINATOR
+     * REMOVED_TOKEN,deadtoken
+     *   set if the node is dead and has been removed by its REMOVAL_COORDINATOR
+     *
+     * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
      * you should never bootstrap a new node during a removetoken, decommission or move.
      */
     public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
@@ -666,6 +672,8 @@ public class StorageService implements I
                     handleStateBootstrap(endpoint, pieces);
                 else if (moveName.equals(VersionedValue.STATUS_NORMAL))
                     handleStateNormal(endpoint, pieces);
+                else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
+                    handleStateRemoving(endpoint, pieces);
                 else if (moveName.equals(VersionedValue.STATUS_LEAVING))
                     handleStateLeaving(endpoint, pieces);
                 else if (moveName.equals(VersionedValue.STATUS_LEFT))
@@ -732,7 +740,7 @@ public class StorageService implements I
      * in reads.
      *
      * @param endpoint node
-     * @param pieces STATE_NORMAL,token[,other_state,token]
+     * @param pieces STATE_NORMAL,token
      */
     private void handleStateNormal(InetAddress endpoint, String[] pieces)
     {
@@ -774,12 +782,6 @@ public class StorageService implements I
                                        endpoint, currentOwner, token, endpoint));
         }
 
-        if (pieces.length > 2)
-        {
-            assert pieces.length == 4;
-            handleStateRemoving(endpoint, getPartitioner().getTokenFactory().fromString(pieces[3]), pieces[2]);
-        }
-
         if (tokenMetadata_.isMoving(endpoint)) // if endpoint was moving to a new token
             tokenMetadata_.removeFromMoving(endpoint);
 
@@ -861,37 +863,50 @@ public class StorageService implements I
      * Handle notification that a node being actively removed from the ring via 'removetoken'
      *
      * @param endpoint node
-     * @param state either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
+     * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
      */
-    private void handleStateRemoving(InetAddress endpoint, Token removeToken, String state)
+    private void handleStateRemoving(InetAddress endpoint, String[] pieces)
     {
-        InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken);
-        
-        if (removeEndpoint == null)
-            return;
-        
-        if (removeEndpoint.equals(FBUtilities.getLocalAddress()))
-        {
-            logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
-            return;
-        }
+        String state = pieces[0];
+        assert (pieces.length > 0);
 
-        if (VersionedValue.REMOVED_TOKEN.equals(state))
+        if (endpoint.equals(FBUtilities.getLocalAddress()))
         {
-            excise(removeToken, removeEndpoint);
+            logger_.info("Received removeToken gossip about myself. Is this node rejoining after an explicit removetoken?");
+            try
+            {
+                drain();
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
+            return;
         }
-        else if (VersionedValue.REMOVING_TOKEN.equals(state))
+        if (tokenMetadata_.isMember(endpoint))
         {
-            if (logger_.isDebugEnabled())
-                logger_.debug("Token " + removeToken + " removed manually (endpoint was " + removeEndpoint + ")");
+            Token removeToken = tokenMetadata_.getToken(endpoint);
 
-            // Note that the endpoint is being removed
-            tokenMetadata_.addLeavingEndpoint(removeEndpoint);
-            calculatePendingRanges();
+            if (VersionedValue.REMOVED_TOKEN.equals(state))
+            {
+                excise(removeToken, endpoint);
+            }
+            else if (VersionedValue.REMOVING_TOKEN.equals(state))
+            {
+                if (logger_.isDebugEnabled())
+                    logger_.debug("Token " + removeToken + " removed manually (endpoint was " + endpoint + ")");
 
-            // grab any data we are now responsible for and notify responsible node
-            restoreReplicaCount(removeEndpoint, endpoint);
-        }
+                // Note that the endpoint is being removed
+                tokenMetadata_.addLeavingEndpoint(endpoint);
+                calculatePendingRanges();
+
+                // find the endpoint coordinating this removal that we need to notify when we're done
+                String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+                Token coordtoken = getPartitioner().getTokenFactory().fromString(coordinator[1]);
+                // grab any data we are now responsible for and notify responsible node
+                restoreReplicaCount(endpoint, tokenMetadata_.getEndpoint(coordtoken));
+            }
+        } // not a member, nothing to do
     }
 
     private void excise(Token token, InetAddress endpoint)
@@ -1060,6 +1075,8 @@ public class StorageService implements I
         // notify the remote token
         Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote));
         IFailureDetector failureDetector = FailureDetector.instance;
+        if (logger_.isDebugEnabled())
+            logger_.debug("Notifying " + remote.toString() + " of replication completion\n");
         while (failureDetector.isAlive(remote))
         {
             IAsyncResult iar = MessagingService.instance().sendRR(msg, remote);
@@ -2003,9 +2020,14 @@ public class StorageService implements I
      */
     public void forceRemoveCompletion()
     {
-        if (!replicatingNodes.isEmpty())
+        if (!replicatingNodes.isEmpty()  || !tokenMetadata_.getLeavingEndpoints().isEmpty())
         {
             logger_.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, ","));
+            for (InetAddress endpoint : tokenMetadata_.getLeavingEndpoints())
+            {
+                Gossiper.instance.advertiseTokenRemoved(endpoint, tokenMetadata_.getToken(endpoint));
+                tokenMetadata_.removeEndpoint(endpoint);
+            }
             replicatingNodes.clear();
         }
         else
@@ -2069,9 +2091,9 @@ public class StorageService implements I
 
         tokenMetadata_.addLeavingEndpoint(endpoint);
         calculatePendingRanges();
-        // bundle two states together. include this nodes state to keep the status quo, 
-        // but indicate the leaving token so that it can be dealt with.
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removingNonlocal(localToken, token));
+        // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
+        // we add our own token so other nodes to let us know when they're done
+        Gossiper.instance.advertiseRemoving(endpoint, token, localToken);
 
         // kick off streaming commands
         restoreReplicaCount(endpoint, myAddress);
@@ -2091,8 +2113,8 @@ public class StorageService implements I
 
         excise(token, endpoint);
 
-        // indicate the token has left
-        Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removedNonlocal(localToken, token));
+        // gossiper will indicate the token has left
+        Gossiper.instance.advertiseTokenRemoved(endpoint, token);
 
         replicatingNodes.clear();
         removingNode = null;
@@ -2100,8 +2122,18 @@ public class StorageService implements I
 
     public void confirmReplication(InetAddress node)
     {
-        assert !replicatingNodes.isEmpty();
-        replicatingNodes.remove(node);
+        // replicatingNodes can be empty in the case where this node used to be a removal coordinator,
+        // but restarted before all 'replication finished' messages arrived. In that case, we'll
+        // still go ahead and acknowledge it.
+        if (!replicatingNodes.isEmpty())
+        {
+            replicatingNodes.remove(node);
+        }
+        else
+        {
+            logger_.info("Received unexpected REPLICATION_FINISHED message from " + node
+                         + ". Was this node recently a removal coordinator?");
+        }
     }
 
     public boolean isClientMode()