You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by br...@apache.org on 2011/07/25 20:58:28 UTC
svn commit: r1150847 - in /cassandra/branches/cassandra-0.8: ./
src/java/org/apache/cassandra/gms/ src/java/org/apache/cassandra/net/
src/java/org/apache/cassandra/service/
Author: brandonwilliams
Date: Mon Jul 25 18:58:27 2011
New Revision: 1150847
URL: http://svn.apache.org/viewvc?rev=1150847&view=rev
Log:
Gossip handles dead states, token removal actually works, gossip states
are held for aVeryLongTime.
Patch by brandonwilliams and Paul Cannon, reviewed by Paul Cannon for
CASSANDRA-2496.
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/NEWS.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Mon Jul 25 18:58:27 2011
@@ -1,6 +1,7 @@
0.8.3
* add ability to drop local reads/writes that are going to timeout
(CASSANDRA-2943)
+ * revamp token removal process, keep gossip states for 3 days (CASSANDRA-2946)
0.8.2
Modified: cassandra/branches/cassandra-0.8/NEWS.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/NEWS.txt?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/NEWS.txt (original)
+++ cassandra/branches/cassandra-0.8/NEWS.txt Mon Jul 25 18:58:27 2011
@@ -1,3 +1,12 @@
+0.8.3
+=====
+
+Upgrading
+---------
+ - Token removal has been revamped. Removing tokens in a mixed cluster with
+ 0.8.3 will not work, so the entire cluster will need to be running 0.8.3
+ first, except for the dead node.
+
0.8.2
=====
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/ApplicationState.java Mon Jul 25 18:58:27 2011
@@ -29,6 +29,7 @@ public enum ApplicationState
DC,
RACK,
RELEASE_VERSION,
+ REMOVAL_COORDINATOR,
// pad to allow adding new states to existing cluster
X1,
X2,
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jul 25 18:58:27 2011
@@ -27,6 +27,7 @@ import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.*;
+import org.apache.cassandra.dht.Token;
import org.apache.cassandra.db.SystemTable;
import org.apache.cassandra.net.MessageProducer;
import org.apache.cassandra.config.ConfigurationException;
@@ -58,6 +59,8 @@ public class Gossiper implements IFailur
private static final RetryingScheduledThreadPoolExecutor executor = new RetryingScheduledThreadPoolExecutor("GossipTasks");
static final ApplicationState[] STATES = ApplicationState.values();
+ static final List<String> DEAD_STATES = Arrays.asList(VersionedValue.REMOVING_TOKEN, VersionedValue.REMOVED_TOKEN, VersionedValue.STATUS_LEFT);
+
private ScheduledFuture<?> scheduledGossipTask;
public final static int intervalInMillis = 1000;
public final static int QUARANTINE_DELAY = StorageService.RING_DELAY * 2;
@@ -264,17 +267,21 @@ public class Gossiper implements IFailur
}
/**
- * Removes the endpoint from unreachable endpoint set
+ * Removes the endpoint from gossip completely
*
* @param endpoint endpoint to be removed from the current membership.
*/
private void evictFromMembership(InetAddress endpoint)
{
unreachableEndpoints.remove(endpoint);
+ endpointStateMap.remove(endpoint);
+ justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+ if (logger.isDebugEnabled())
+ logger.debug("evicting " + endpoint + " from gossip");
}
/**
- * Removes the endpoint completely from Gossip
+ * Removes the endpoint from Gossip but retains endpoint state
*/
public void removeEndpoint(InetAddress endpoint)
{
@@ -288,6 +295,8 @@ public class Gossiper implements IFailur
FailureDetector.instance.remove(endpoint);
versions.remove(endpoint);
justRemovedEndpoints.put(endpoint, System.currentTimeMillis());
+ if (logger.isDebugEnabled())
+ logger.debug("removing endpoint " + endpoint);
}
/**
@@ -328,6 +337,67 @@ public class Gossiper implements IFailur
}
}
+ /**
+ * This method will begin removing an existing endpoint from the cluster by spoofing its state
+ * This should never be called unless this coordinator has had 'removetoken' invoked
+ *
+ * @param endpoint - the endpoint being removed
+ * @param token - the token being removed
+ * @param mytoken - my own token for replication coordination
+ */
+ public void advertiseRemoving(InetAddress endpoint, Token token, Token mytoken)
+ {
+ EndpointState epState = endpointStateMap.get(endpoint);
+ // remember this node's generation
+ int generation = epState.getHeartBeatState().getGeneration();
+ logger.info("Removing token: " + token);
+ logger.info("Sleeping for " + StorageService.RING_DELAY + "ms to ensure " + endpoint + " does not change");
+ try
+ {
+ Thread.sleep(StorageService.RING_DELAY);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ // make sure it did not change
+ epState = endpointStateMap.get(endpoint);
+ if (epState.getHeartBeatState().getGeneration() != generation)
+ throw new RuntimeException("Endpoint " + endpoint + " generation changed while trying to remove it");
+ // update the other node's generation to mimic it as if it had changed it itself
+ logger.info("Advertising removal for " + endpoint);
+ epState.updateTimestamp(); // make sure we don't evict it too soon
+ epState.getHeartBeatState().forceNewerGenerationUnsafe();
+ epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removingNonlocal(token));
+ epState.addApplicationState(ApplicationState.REMOVAL_COORDINATOR, StorageService.instance.valueFactory.removalCoordinator(mytoken));
+ endpointStateMap.put(endpoint, epState);
+ }
+
+ /**
+ * Handles switching the endpoint's state from REMOVING_TOKEN to REMOVED_TOKEN
+ * This should only be called after advertiseRemoving
+ * @param endpoint
+ * @param token
+ */
+ public void advertiseTokenRemoved(InetAddress endpoint, Token token)
+ {
+ EndpointState epState = endpointStateMap.get(endpoint);
+ epState.updateTimestamp(); // make sure we don't evict it too soon
+ epState.getHeartBeatState().forceNewerGenerationUnsafe();
+ epState.addApplicationState(ApplicationState.STATUS, StorageService.instance.valueFactory.removedNonlocal(token));
+ logger.info("Completing removal of " + endpoint);
+ endpointStateMap.put(endpoint, epState);
+ // ensure at least one gossip round occurs before returning
+ try
+ {
+ Thread.sleep(intervalInMillis * 2);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
public boolean isKnownEndpoint(InetAddress endpoint)
{
return endpointStateMap.containsKey(endpoint);
@@ -456,23 +526,18 @@ public class Gossiper implements IFailur
{
long duration = now - epState.getUpdateTimestamp();
+ if (StorageService.instance.getTokenMetadata().isMember(endpoint))
+ epState.setHasToken(true);
// check if this is a fat client. fat clients are removed automatically from
// gosip after FatClientTimeout
- if (!epState.hasToken() && !epState.isAlive() && (duration > FatClientTimeout))
+ if (!epState.hasToken() && !epState.isAlive() && !justRemovedEndpoints.containsKey(endpoint) && (duration > FatClientTimeout))
{
- if (StorageService.instance.getTokenMetadata().isMember(endpoint))
- epState.setHasToken(true);
- else
- {
- if (!justRemovedEndpoints.containsKey(endpoint)) // if the node was decommissioned, it will have been removed but still appear as a fat client
- {
- logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
- removeEndpoint(endpoint); // after quarantine justRemoveEndpoints will remove the state
- }
- }
+ logger.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout + "ms, removing from gossip");
+ removeEndpoint(endpoint); // will put it in justRemovedEndpoints to respect quarantine delay
+ evictFromMembership(endpoint); // can get rid of the state immediately
}
- if ( !epState.isAlive() && (duration > aVeryLongTime) )
+ if ( !epState.isAlive() && (duration > aVeryLongTime) && (!StorageService.instance.getTokenMetadata().isMember(endpoint)))
{
evictFromMembership(endpoint);
}
@@ -488,7 +553,6 @@ public class Gossiper implements IFailur
if (logger.isDebugEnabled())
logger.debug(QUARANTINE_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
justRemovedEndpoints.remove(entry.getKey());
- endpointStateMap.remove(entry.getKey());
}
}
}
@@ -585,6 +649,7 @@ public class Gossiper implements IFailur
int remoteGeneration = remoteEndpointState.getHeartBeatState().getGeneration();
if ( remoteGeneration > localGeneration )
{
+ localEndpointState.updateTimestamp();
fd.report(endpoint);
return;
}
@@ -595,6 +660,7 @@ public class Gossiper implements IFailur
int remoteVersion = remoteEndpointState.getHeartBeatState().getHeartBeatVersion();
if ( remoteVersion > localVersion )
{
+ localEndpointState.updateTimestamp();
fd.report(endpoint);
}
}
@@ -607,6 +673,7 @@ public class Gossiper implements IFailur
if (logger.isTraceEnabled())
logger.trace("marking as alive {}", addr);
localState.markAlive();
+ localState.updateTimestamp(); // prevents doStatusCheck from racing us and evicting if it was down > aVeryLongTime
liveEndpoints.add(addr);
unreachableEndpoints.remove(addr);
logger.info("InetAddress {} is now UP", addr);
@@ -638,10 +705,13 @@ public class Gossiper implements IFailur
*/
private void handleMajorStateChange(InetAddress ep, EndpointState epState)
{
- if (endpointStateMap.get(ep) != null)
- logger.info("Node {} has restarted, now UP again", ep);
- else
- logger.info("Node {} is now part of the cluster", ep);
+ if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+ {
+ if (endpointStateMap.get(ep) != null)
+ logger.info("Node {} has restarted, now UP again", ep);
+ else
+ logger.info("Node {} is now part of the cluster", ep);
+ }
if (logger.isTraceEnabled())
logger.trace("Adding endpoint state for " + ep);
endpointStateMap.put(ep, epState);
@@ -651,11 +721,31 @@ public class Gossiper implements IFailur
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onDead(ep, epState);
}
- markAlive(ep, epState);
+ if (epState.getApplicationState(ApplicationState.STATUS) != null && !isDeadState(epState.getApplicationState(ApplicationState.STATUS).value))
+ markAlive(ep, epState);
+ else
+ {
+ logger.debug("Not marking " + ep + " alive due to dead state");
+ epState.markDead();
+ epState.setHasToken(true); // fat clients won't have a dead state
+ }
for (IEndpointStateChangeSubscriber subscriber : subscribers)
subscriber.onJoin(ep, epState);
}
+ private Boolean isDeadState(String value)
+ {
+ String[] pieces = value.split(VersionedValue.DELIMITER_STR, -1);
+ assert (pieces.length > 0);
+ String state = pieces[0];
+ for (String deadstate : DEAD_STATES)
+ {
+ if (state.equals(deadstate))
+ return true;
+ }
+ return false;
+ }
+
void applyStateLocally(Map<InetAddress, EndpointState> epStateMap)
{
for (Entry<InetAddress, EndpointState> entry : epStateMap.entrySet())
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/HeartBeatState.java Mon Jul 25 18:58:27 2011
@@ -71,6 +71,11 @@ class HeartBeatState
{
return version;
}
+
+ void forceNewerGenerationUnsafe()
+ {
+ generation += 1;
+ }
}
class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/gms/VersionedValue.java Mon Jul 25 18:58:27 2011
@@ -49,7 +49,7 @@ public class VersionedValue implements C
public final static char DELIMITER = ',';
public final static String DELIMITER_STR = new String(new char[] { DELIMITER });
- // values for State.STATUS
+ // values for ApplicationState.STATUS
public final static String STATUS_BOOTSTRAPPING = "BOOT";
public final static String STATUS_NORMAL = "NORMAL";
public final static String STATUS_LEAVING = "LEAVING";
@@ -59,6 +59,9 @@ public class VersionedValue implements C
public final static String REMOVING_TOKEN = "removing";
public final static String REMOVED_TOKEN = "removed";
+ // values for ApplicationState.REMOVAL_COORDINATOR
+ public final static String REMOVAL_COORDINATOR = "REMOVER";
+
public final int version;
public final String value;
@@ -129,20 +132,19 @@ public class VersionedValue implements C
return new VersionedValue(VersionedValue.STATUS_MOVING + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
- public VersionedValue removingNonlocal(Token localToken, Token token)
+ public VersionedValue removingNonlocal(Token token)
+ {
+ return new VersionedValue(VersionedValue.REMOVING_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+ }
+
+ public VersionedValue removedNonlocal(Token token)
{
- return new VersionedValue(VersionedValue.STATUS_NORMAL
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
- + VersionedValue.DELIMITER + VersionedValue.REMOVING_TOKEN
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+ return new VersionedValue(VersionedValue.REMOVED_TOKEN + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
- public VersionedValue removedNonlocal(Token localToken, Token token)
+ public VersionedValue removalCoordinator(Token token)
{
- return new VersionedValue(VersionedValue.STATUS_NORMAL
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(localToken)
- + VersionedValue.DELIMITER + VersionedValue.REMOVED_TOKEN
- + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
+ return new VersionedValue(VersionedValue.REMOVAL_COORDINATOR + VersionedValue.DELIMITER + partitioner.getTokenFactory().toString(token));
}
public VersionedValue datacenter(String dcId)
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/net/MessagingService.java Mon Jul 25 18:58:27 2011
@@ -456,6 +456,10 @@ public final class MessagingService impl
public void receive(Message message, String id)
{
+ if (logger_.isTraceEnabled())
+ logger_.trace(FBUtilities.getLocalAddress() + " received " + message.getVerb()
+ + " from " + id + "@" + message.getFrom());
+
message = SinkManager.processServerMessage(message, id);
if (message == null)
return;
Modified: cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java?rev=1150847&r1=1150846&r2=1150847&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/service/StorageService.java Mon Jul 25 18:58:27 2011
@@ -623,29 +623,35 @@ public class StorageService implements I
}
/*
- * onChange only ever sees one ApplicationState piece change at a time, so we perform a kind of state machine here.
- * We are concerned with two events: knowing the token associated with an endpoint, and knowing its operation mode.
- * Nodes can start in either bootstrap or normal mode, and from bootstrap mode can change mode to normal.
- * A node in bootstrap mode needs to have pendingranges set in TokenMetadata; a node in normal mode
- * should instead be part of the token ring.
+ * Handle the reception of a new particular ApplicationState for a particular endpoint. Note that the value of the
+ * ApplicationState has not necessarily "changed" since the last known value, if we already received the same update
+ * from somewhere else.
+ *
+ * onChange only ever sees one ApplicationState piece change at a time (even if many ApplicationState updates were
+ * received at the same time), so we perform a kind of state machine here. We are concerned with two events: knowing
+ * the token associated with an endpoint, and knowing its operation mode. Nodes can start in either bootstrap or
+ * normal mode, and from bootstrap mode can change mode to normal. A node in bootstrap mode needs to have
+ * pendingranges set in TokenMetadata; a node in normal mode should instead be part of the token ring.
*
- * Normal MOVE_STATE progression of a node should be like this:
- * STATE_BOOTSTRAPPING,token
+ * Normal progression of ApplicationState.STATUS values for a node should be like this:
+ * STATUS_BOOTSTRAPPING,token
* if bootstrapping. stays this way until all files are received.
- * STATE_NORMAL,token
+ * STATUS_NORMAL,token
* ready to serve reads and writes.
- * STATE_NORMAL,token,REMOVE_TOKEN,token
- * specialized normal state in which this node acts as a proxy to tell the cluster about a dead node whose
- * token is being removed. this value becomes the permanent state of this node (unless it coordinates another
- * removetoken in the future).
- * STATE_LEAVING,token
- * get ready to leave the cluster as part of a decommission or move
- * STATE_LEFT,token
- * set after decommission or move is completed.
- * STATE_MOVE,token
- * set if node if currently moving to a new token in the ring
- *
- * Note: Any time a node state changes from STATE_NORMAL, it will not be visible to new nodes. So it follows that
+ * STATUS_LEAVING,token
+ * get ready to leave the cluster as part of a decommission
+ * STATUS_LEFT,token
+ * set after decommission is completed.
+ *
+ * Other STATUS values that may be seen (possibly anywhere in the normal progression):
+ * STATUS_MOVING,newtoken
+ * set if node is currently moving to a new token in the ring
+ * REMOVING_TOKEN,deadtoken
+ * set if the node is dead and is being removed by its REMOVAL_COORDINATOR
+ * REMOVED_TOKEN,deadtoken
+ * set if the node is dead and has been removed by its REMOVAL_COORDINATOR
+ *
+ * Note: Any time a node state changes from STATUS_NORMAL, it will not be visible to new nodes. So it follows that
* you should never bootstrap a new node during a removetoken, decommission or move.
*/
public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value)
@@ -666,6 +672,8 @@ public class StorageService implements I
handleStateBootstrap(endpoint, pieces);
else if (moveName.equals(VersionedValue.STATUS_NORMAL))
handleStateNormal(endpoint, pieces);
+ else if (moveName.equals(VersionedValue.REMOVING_TOKEN) || moveName.equals(VersionedValue.REMOVED_TOKEN))
+ handleStateRemoving(endpoint, pieces);
else if (moveName.equals(VersionedValue.STATUS_LEAVING))
handleStateLeaving(endpoint, pieces);
else if (moveName.equals(VersionedValue.STATUS_LEFT))
@@ -732,7 +740,7 @@ public class StorageService implements I
* in reads.
*
* @param endpoint node
- * @param pieces STATE_NORMAL,token[,other_state,token]
+ * @param pieces STATE_NORMAL,token
*/
private void handleStateNormal(InetAddress endpoint, String[] pieces)
{
@@ -774,12 +782,6 @@ public class StorageService implements I
endpoint, currentOwner, token, endpoint));
}
- if (pieces.length > 2)
- {
- assert pieces.length == 4;
- handleStateRemoving(endpoint, getPartitioner().getTokenFactory().fromString(pieces[3]), pieces[2]);
- }
-
if (tokenMetadata_.isMoving(endpoint)) // if endpoint was moving to a new token
tokenMetadata_.removeFromMoving(endpoint);
@@ -861,37 +863,50 @@ public class StorageService implements I
* Handle notification that a node being actively removed from the ring via 'removetoken'
*
* @param endpoint node
- * @param state either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
+ * @param pieces either REMOVED_TOKEN (node is gone) or REMOVING_TOKEN (replicas need to be restored)
*/
- private void handleStateRemoving(InetAddress endpoint, Token removeToken, String state)
+ private void handleStateRemoving(InetAddress endpoint, String[] pieces)
{
- InetAddress removeEndpoint = tokenMetadata_.getEndpoint(removeToken);
-
- if (removeEndpoint == null)
- return;
-
- if (removeEndpoint.equals(FBUtilities.getLocalAddress()))
- {
- logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
- return;
- }
+ String state = pieces[0];
+ assert (pieces.length > 0);
- if (VersionedValue.REMOVED_TOKEN.equals(state))
+ if (endpoint.equals(FBUtilities.getLocalAddress()))
{
- excise(removeToken, removeEndpoint);
+ logger_.info("Received removeToken gossip about myself. Is this node rejoining after an explicit removetoken?");
+ try
+ {
+ drain();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ return;
}
- else if (VersionedValue.REMOVING_TOKEN.equals(state))
+ if (tokenMetadata_.isMember(endpoint))
{
- if (logger_.isDebugEnabled())
- logger_.debug("Token " + removeToken + " removed manually (endpoint was " + removeEndpoint + ")");
+ Token removeToken = tokenMetadata_.getToken(endpoint);
- // Note that the endpoint is being removed
- tokenMetadata_.addLeavingEndpoint(removeEndpoint);
- calculatePendingRanges();
+ if (VersionedValue.REMOVED_TOKEN.equals(state))
+ {
+ excise(removeToken, endpoint);
+ }
+ else if (VersionedValue.REMOVING_TOKEN.equals(state))
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug("Token " + removeToken + " removed manually (endpoint was " + endpoint + ")");
- // grab any data we are now responsible for and notify responsible node
- restoreReplicaCount(removeEndpoint, endpoint);
- }
+ // Note that the endpoint is being removed
+ tokenMetadata_.addLeavingEndpoint(endpoint);
+ calculatePendingRanges();
+
+ // find the endpoint coordinating this removal that we need to notify when we're done
+ String[] coordinator = Gossiper.instance.getEndpointStateForEndpoint(endpoint).getApplicationState(ApplicationState.REMOVAL_COORDINATOR).value.split(VersionedValue.DELIMITER_STR, -1);
+ Token coordtoken = getPartitioner().getTokenFactory().fromString(coordinator[1]);
+ // grab any data we are now responsible for and notify responsible node
+ restoreReplicaCount(endpoint, tokenMetadata_.getEndpoint(coordtoken));
+ }
+ } // not a member, nothing to do
}
private void excise(Token token, InetAddress endpoint)
@@ -1060,6 +1075,8 @@ public class StorageService implements I
// notify the remote token
Message msg = new Message(local, StorageService.Verb.REPLICATION_FINISHED, new byte[0], Gossiper.instance.getVersion(remote));
IFailureDetector failureDetector = FailureDetector.instance;
+ if (logger_.isDebugEnabled())
+ logger_.debug("Notifying " + remote.toString() + " of replication completion\n");
while (failureDetector.isAlive(remote))
{
IAsyncResult iar = MessagingService.instance().sendRR(msg, remote);
@@ -2003,9 +2020,14 @@ public class StorageService implements I
*/
public void forceRemoveCompletion()
{
- if (!replicatingNodes.isEmpty())
+ if (!replicatingNodes.isEmpty() || !tokenMetadata_.getLeavingEndpoints().isEmpty())
{
logger_.warn("Removal not confirmed for for " + StringUtils.join(this.replicatingNodes, ","));
+ for (InetAddress endpoint : tokenMetadata_.getLeavingEndpoints())
+ {
+ Gossiper.instance.advertiseTokenRemoved(endpoint, tokenMetadata_.getToken(endpoint));
+ tokenMetadata_.removeEndpoint(endpoint);
+ }
replicatingNodes.clear();
}
else
@@ -2069,9 +2091,9 @@ public class StorageService implements I
tokenMetadata_.addLeavingEndpoint(endpoint);
calculatePendingRanges();
- // bundle two states together. include this nodes state to keep the status quo,
- // but indicate the leaving token so that it can be dealt with.
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removingNonlocal(localToken, token));
+ // the gossiper will handle spoofing this node's state to REMOVING_TOKEN for us
+ // we add our own token so other nodes to let us know when they're done
+ Gossiper.instance.advertiseRemoving(endpoint, token, localToken);
// kick off streaming commands
restoreReplicaCount(endpoint, myAddress);
@@ -2091,8 +2113,8 @@ public class StorageService implements I
excise(token, endpoint);
- // indicate the token has left
- Gossiper.instance.addLocalApplicationState(ApplicationState.STATUS, valueFactory.removedNonlocal(localToken, token));
+ // gossiper will indicate the token has left
+ Gossiper.instance.advertiseTokenRemoved(endpoint, token);
replicatingNodes.clear();
removingNode = null;
@@ -2100,8 +2122,18 @@ public class StorageService implements I
public void confirmReplication(InetAddress node)
{
- assert !replicatingNodes.isEmpty();
- replicatingNodes.remove(node);
+ // replicatingNodes can be empty in the case where this node used to be a removal coordinator,
+ // but restarted before all 'replication finished' messages arrived. In that case, we'll
+ // still go ahead and acknowledge it.
+ if (!replicatingNodes.isEmpty())
+ {
+ replicatingNodes.remove(node);
+ }
+ else
+ {
+ logger_.info("Received unexpected REPLICATION_FINISHED message from " + node
+ + ". Was this node recently a removal coordinator?");
+ }
}
public boolean isClientMode()