You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2010/01/25 10:26:02 UTC
svn commit: r902744 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra:
gms/EndPointState.java gms/FailureDetector.java gms/Gossiper.java
gms/IFailureDetector.java service/StorageService.java
Author: jaakko
Date: Mon Jan 25 09:26:01 2010
New Revision: 902744
URL: http://svn.apache.org/viewvc?rev=902744&view=rev
Log:
modify removetoken to remove node from gossip. remove fat clients automatically after 1h of inactivity. patch by jaakko, reviewed by jbellis. CASSANDRA-644
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java Mon Jan 25 09:26:01 2010
@@ -48,6 +48,12 @@
boolean isAlive_;
boolean isAGossiper_;
+ // whether this endpoint has token associated with it or not. Initially set false for all
+ // endpoints. After certain time of inactivity, gossiper will examine if this node has a
+ // token or not and will set this true if token is found. If there is no token, this is a
+ // fat client and will be removed automatically from gossip.
+ boolean hasToken_;
+
public static ICompactSerializer<EndPointState> serializer()
{
return serializer_;
@@ -59,6 +65,7 @@
updateTimestamp_ = System.currentTimeMillis();
isAlive_ = true;
isAGossiper_ = false;
+ hasToken_ = false;
}
HeartBeatState getHeartBeatState()
@@ -121,6 +128,16 @@
isAGossiper_ = value;
}
+ public synchronized void setHasToken(boolean value)
+ {
+ hasToken_ = value;
+ }
+
+ public boolean getHasToken()
+ {
+ return hasToken_;
+ }
+
public List<Map.Entry<String,ApplicationState>> getSortedApplicationStates()
{
ArrayList<Map.Entry<String, ApplicationState>> entries = new ArrayList<Map.Entry<String, ApplicationState>>();
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Mon Jan 25 09:26:01 2010
@@ -157,6 +157,11 @@
}
}
}
+
+ public void remove(InetAddress ep)
+ {
+ arrivalSamples_.remove(ep);
+ }
public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
{
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jan 25 09:26:01 2010
@@ -28,6 +28,7 @@
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.io.Streaming;
import org.apache.log4j.Logger;
@@ -106,6 +107,7 @@
private Timer gossipTimer_;
private InetAddress localEndPoint_;
private long aVeryLongTime_;
+ private long FatClientTimeout_;
private Random random_ = new Random();
/* subscribers for interest in EndPointState change */
@@ -123,10 +125,19 @@
/* map where key is the endpoint and value is the state associated with the endpoint */
Map<InetAddress, EndPointState> endPointStateMap_ = new Hashtable<InetAddress, EndPointState>();
+ /* map where key is endpoint and value is timestamp when this endpoint was removed from
+ * gossip. We will ignore any gossip regarding these endpoints for Streaming.RING_DELAY time
+ * after removal to prevent nodes from falsely reincarnating during the time when removal
+ * gossip gets propagated to all nodes */
+ Map<InetAddress, Long> justRemovedEndPoints_ = new Hashtable<InetAddress, Long>();
+
private Gossiper()
{
gossipTimer_ = new Timer(false);
+ // 3 days
aVeryLongTime_ = 259200 * 1000;
+ // 1 hour
+ FatClientTimeout_ = 60 * 60 * 1000;
/* register with the Failure Detector for receiving Failure detector events */
FailureDetector.instance.registerFailureDetectionEventListener(this);
}
@@ -201,6 +212,18 @@
}
/**
+ * Removes the endpoint completely from Gossip
+ */
+ public void removeEndPoint(InetAddress endpoint)
+ {
+ liveEndpoints_.remove(endpoint);
+ unreachableEndpoints_.remove(endpoint);
+ endPointStateMap_.remove(endpoint);
+ FailureDetector.instance.remove(endpoint);
+ justRemovedEndPoints_.put(endpoint, System.currentTimeMillis());
+ }
+
+ /**
* No locking required since it is called from a method that already
* has acquired a lock. The gossip digest is built based on randomization
* rather than just looping through the collection of live endpoints.
@@ -354,8 +377,9 @@
void doStatusCheck()
{
- Set<InetAddress> eps = endPointStateMap_.keySet();
+ long now = System.currentTimeMillis();
+ Set<InetAddress> eps = endPointStateMap_.keySet();
for ( InetAddress endpoint : eps )
{
if ( endpoint.equals(localEndPoint_) )
@@ -365,12 +389,40 @@
EndPointState epState = endPointStateMap_.get(endpoint);
if ( epState != null )
{
- long duration = System.currentTimeMillis() - epState.getUpdateTimestamp();
+ long duration = now - epState.getUpdateTimestamp();
+
+ // check if this is a fat client. fat clients are removed automatically from
+ // gosip after FatClientTimeout
+ if (!epState.getHasToken() && !epState.isAlive() && (duration > FatClientTimeout_))
+ {
+ if (StorageService.instance.getTokenMetadata().isMember(endpoint))
+ epState.setHasToken(true);
+ else
+ {
+ logger_.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout_ + "ms, removing from gossip");
+ removeEndPoint(endpoint);
+ }
+ }
+
if ( !epState.isAlive() && (duration > aVeryLongTime_) )
{
evictFromMembership(endpoint);
}
}
+
+ if (!justRemovedEndPoints_.isEmpty())
+ {
+ Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress, Long>(justRemovedEndPoints_);
+ for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
+ {
+ if ((now - entry.getValue()) > Streaming.RING_DELAY)
+ {
+ if (logger_.isDebugEnabled())
+ logger_.debug(Streaming.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
+ justRemovedEndPoints_.remove(entry.getKey());
+ }
+ }
+ }
}
}
@@ -520,6 +572,8 @@
private void handleNewJoin(InetAddress ep, EndPointState epState)
{
+ if (justRemovedEndPoints_.containsKey(ep))
+ return;
logger_.info("Node " + ep + " is now part of the cluster");
handleMajorStateChange(ep, epState, false);
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java Mon Jan 25 09:26:01 2010
@@ -55,6 +55,11 @@
* param ep endpoint being reported.
*/
public void report(InetAddress ep);
+
+ /**
+ * remove endpoint from failure detector
+ */
+ public void remove(InetAddress ep);
/**
* Register interest for Failure Detector events.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jan 25 09:26:01 2010
@@ -533,12 +533,19 @@
// if we're here, endPoint is not leaving but broadcasting remove token command
assert (typeOfState.equals(REMOVE_TOKEN));
InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
+ // let's make sure that we're not removing ourselves. This can happen when a node
+ // enters ring as a replacement for a removed node. removeToken for the old node is
+ // still in gossip, so we will see it.
+ if (endPointThatLeft.equals(FBUtilities.getLocalAddress()))
+ {
+ logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
+ return;
+ }
if (logger_.isDebugEnabled())
logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
if (endPointThatLeft != null)
{
- restoreReplicaCount(endPointThatLeft);
- tokenMetadata_.removeEndpoint(endPointThatLeft);
+ removeEndPointLocally(endPointThatLeft);
}
}
@@ -548,6 +555,17 @@
}
/**
+ * endPoint was completely removed from ring (as a result of removetoken command). Remove it
+ * from token metadata and gossip and restore replica count.
+ */
+ private void removeEndPointLocally(InetAddress endPoint)
+ {
+ restoreReplicaCount(endPoint);
+ Gossiper.instance.removeEndPoint(endPoint);
+ tokenMetadata_.removeEndpoint(endPoint);
+ }
+
+ /**
* Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
*
* (1) When in doubt, it is better to write too much to a node than too little. That is, if
@@ -1378,13 +1396,15 @@
InetAddress endPoint = tokenMetadata_.getEndPoint(token);
if (endPoint != null)
{
+ if (endPoint.equals(FBUtilities.getLocalAddress()))
+ throw new UnsupportedOperationException("Cannot remove node's own token");
+
// Let's make sure however that we're not removing a live
// token (member)
if (Gossiper.instance.getLiveMembers().contains(endPoint))
throw new UnsupportedOperationException("Node " + endPoint + " is alive and owns this token. Use decommission command to remove it from the ring");
- restoreReplicaCount(endPoint);
- tokenMetadata_.removeEndpoint(endPoint);
+ removeEndPointLocally(endPoint);
calculatePendingRanges();
}