You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2010/01/25 10:26:02 UTC

svn commit: r902744 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: gms/EndPointState.java gms/FailureDetector.java gms/Gossiper.java gms/IFailureDetector.java service/StorageService.java

Author: jaakko
Date: Mon Jan 25 09:26:01 2010
New Revision: 902744

URL: http://svn.apache.org/viewvc?rev=902744&view=rev
Log:
modify removetoken to remove node from gossip. remove fat clients automatically after 1h of inactivity. patch by jaakko, reviewed by jbellis. CASSANDRA-644

Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/EndPointState.java Mon Jan 25 09:26:01 2010
@@ -48,6 +48,12 @@
     boolean isAlive_;
     boolean isAGossiper_;
 
+    // whether this endpoint has token associated with it or not. Initially set false for all
+    // endpoints. After certain time of inactivity, gossiper will examine if this node has a
+    // token or not and will set this true if token is found. If there is no token, this is a
+    // fat client and will be removed automatically from gossip.
+    boolean hasToken_;
+
     public static ICompactSerializer<EndPointState> serializer()
     {
         return serializer_;
@@ -59,6 +65,7 @@
         updateTimestamp_ = System.currentTimeMillis(); 
         isAlive_ = true; 
         isAGossiper_ = false;
+        hasToken_ = false;
     }
         
     HeartBeatState getHeartBeatState()
@@ -121,6 +128,16 @@
         isAGossiper_ = value;        
     }
 
+    public synchronized void setHasToken(boolean value)
+    {
+        hasToken_ = value;
+    }
+
+    public boolean getHasToken()
+    {
+        return hasToken_;
+    }
+
     public List<Map.Entry<String,ApplicationState>> getSortedApplicationStates()
     {
         ArrayList<Map.Entry<String, ApplicationState>> entries = new ArrayList<Map.Entry<String, ApplicationState>>();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/FailureDetector.java Mon Jan 25 09:26:01 2010
@@ -157,6 +157,11 @@
             }
         }        
     }
+
+    public void remove(InetAddress ep)
+    {
+        arrivalSamples_.remove(ep);
+    }
     
     public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener)
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Mon Jan 25 09:26:01 2010
@@ -28,6 +28,7 @@
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.io.Streaming;
 
 import org.apache.log4j.Logger;
 
@@ -106,6 +107,7 @@
     private Timer gossipTimer_;
     private InetAddress localEndPoint_;
     private long aVeryLongTime_;
+    private long FatClientTimeout_;
     private Random random_ = new Random();
 
     /* subscribers for interest in EndPointState change */
@@ -123,10 +125,19 @@
     /* map where key is the endpoint and value is the state associated with the endpoint */
     Map<InetAddress, EndPointState> endPointStateMap_ = new Hashtable<InetAddress, EndPointState>();
 
+    /* map where key is endpoint and value is timestamp when this endpoint was removed from
+     * gossip. We will ignore any gossip regarding these endpoints for Streaming.RING_DELAY time
+     * after removal to prevent nodes from falsely reincarnating during the time when removal
+     * gossip gets propagated to all nodes */
+    Map<InetAddress, Long> justRemovedEndPoints_ = new Hashtable<InetAddress, Long>();
+
     private Gossiper()
     {
         gossipTimer_ = new Timer(false);
+        // 3 days
         aVeryLongTime_ = 259200 * 1000;
+        // 1 hour
+        FatClientTimeout_ = 60 * 60 * 1000;
         /* register with the Failure Detector for receiving Failure detector events */
         FailureDetector.instance.registerFailureDetectionEventListener(this);
     }
@@ -201,6 +212,18 @@
     }
 
     /**
+     * Removes the endpoint completely from Gossip
+     */
+    public void removeEndPoint(InetAddress endpoint)
+    {
+        liveEndpoints_.remove(endpoint);
+        unreachableEndpoints_.remove(endpoint);
+        endPointStateMap_.remove(endpoint);
+        FailureDetector.instance.remove(endpoint);
+        justRemovedEndPoints_.put(endpoint, System.currentTimeMillis());
+    }
+
+    /**
      * No locking required since it is called from a method that already
      * has acquired a lock. The gossip digest is built based on randomization
      * rather than just looping through the collection of live endpoints.
@@ -354,8 +377,9 @@
 
     void doStatusCheck()
     {
-        Set<InetAddress> eps = endPointStateMap_.keySet();
+        long now = System.currentTimeMillis();
 
+        Set<InetAddress> eps = endPointStateMap_.keySet();
         for ( InetAddress endpoint : eps )
         {
             if ( endpoint.equals(localEndPoint_) )
@@ -365,12 +389,40 @@
             EndPointState epState = endPointStateMap_.get(endpoint);
             if ( epState != null )
             {
-                long duration = System.currentTimeMillis() - epState.getUpdateTimestamp();
+                long duration = now - epState.getUpdateTimestamp();
+
+                // check if this is a fat client. fat clients are removed automatically from
+                // gosip after FatClientTimeout
+                if (!epState.getHasToken() && !epState.isAlive() && (duration > FatClientTimeout_))
+                {
+                    if (StorageService.instance.getTokenMetadata().isMember(endpoint))
+                        epState.setHasToken(true);
+                    else
+                    {
+                        logger_.info("FatClient " + endpoint + " has been silent for " + FatClientTimeout_ + "ms, removing from gossip");
+                        removeEndPoint(endpoint);
+                    }
+                }
+
                 if ( !epState.isAlive() && (duration > aVeryLongTime_) )
                 {
                     evictFromMembership(endpoint);
                 }
             }
+
+            if (!justRemovedEndPoints_.isEmpty())
+            {
+                Hashtable<InetAddress, Long> copy = new Hashtable<InetAddress, Long>(justRemovedEndPoints_);
+                for (Map.Entry<InetAddress, Long> entry : copy.entrySet())
+                {
+                    if ((now - entry.getValue()) > Streaming.RING_DELAY)
+                    {
+                        if (logger_.isDebugEnabled())
+                            logger_.debug(Streaming.RING_DELAY + " elapsed, " + entry.getKey() + " gossip quarantine over");
+                        justRemovedEndPoints_.remove(entry.getKey());
+                    }
+                }
+            }
         }
     }
 
@@ -520,6 +572,8 @@
 
     private void handleNewJoin(InetAddress ep, EndPointState epState)
     {
+        if (justRemovedEndPoints_.containsKey(ep))
+            return;
     	logger_.info("Node " + ep + " is now part of the cluster");
         handleMajorStateChange(ep, epState, false);
     }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/IFailureDetector.java Mon Jan 25 09:26:01 2010
@@ -55,6 +55,11 @@
      * param ep endpoint being reported.
     */
     public void report(InetAddress ep);
+
+    /**
+     * remove endpoint from failure detector
+     */
+    public void remove(InetAddress ep);
     
     /**
      * Register interest for Failure Detector events. 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=902744&r1=902743&r2=902744&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Mon Jan 25 09:26:01 2010
@@ -533,12 +533,19 @@
             // if we're here, endPoint is not leaving but broadcasting remove token command
             assert (typeOfState.equals(REMOVE_TOKEN));
             InetAddress endPointThatLeft = tokenMetadata_.getEndPoint(token);
+            // let's make sure that we're not removing ourselves. This can happen when a node
+            // enters ring as a replacement for a removed node. removeToken for the old node is
+            // still in gossip, so we will see it.
+            if (endPointThatLeft.equals(FBUtilities.getLocalAddress()))
+            {
+                logger_.info("Received removeToken gossip about myself. Is this node a replacement for a removed one?");
+                return;
+            }
             if (logger_.isDebugEnabled())
                 logger_.debug("Token " + token + " removed manually (endpoint was " + ((endPointThatLeft == null) ? "unknown" : endPointThatLeft) + ")");
             if (endPointThatLeft != null)
             {
-                restoreReplicaCount(endPointThatLeft);
-                tokenMetadata_.removeEndpoint(endPointThatLeft);
+                removeEndPointLocally(endPointThatLeft);
             }
         }
 
@@ -548,6 +555,17 @@
     }
 
     /**
+     * endPoint was completely removed from ring (as a result of removetoken command). Remove it
+     * from token metadata and gossip and restore replica count.
+     */
+    private void removeEndPointLocally(InetAddress endPoint)
+    {
+        restoreReplicaCount(endPoint);
+        Gossiper.instance.removeEndPoint(endPoint);
+        tokenMetadata_.removeEndpoint(endPoint);
+    }
+
+    /**
      * Calculate pending ranges according to bootsrapping and leaving nodes. Reasoning is:
      *
      * (1) When in doubt, it is better to write too much to a node than too little. That is, if
@@ -1378,13 +1396,15 @@
         InetAddress endPoint = tokenMetadata_.getEndPoint(token);
         if (endPoint != null)
         {
+            if (endPoint.equals(FBUtilities.getLocalAddress()))
+                throw new UnsupportedOperationException("Cannot remove node's own token");
+
             // Let's make sure however that we're not removing a live
             // token (member)
             if (Gossiper.instance.getLiveMembers().contains(endPoint))
                 throw new UnsupportedOperationException("Node " + endPoint + " is alive and owns this token. Use decommission command to remove it from the ring");
 
-            restoreReplicaCount(endPoint);
-            tokenMetadata_.removeEndpoint(endPoint);
+            removeEndPointLocally(endPoint);
             calculatePendingRanges();
         }