You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/30 17:30:27 UTC
svn commit: r799331 [14/29] - in /incubator/cassandra/trunk: ./
src/java/org/apache/cassandra/concurrent/
src/java/org/apache/cassandra/config/ src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/dht/ src/java/org/apache/cassandra/gms/ src/...
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java?rev=799331&r1=799330&r2=799331&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/gms/Gossiper.java Thu Jul 30 15:30:21 2009
@@ -1,1121 +1,1121 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.gms;
-
-import java.io.*;
-import java.util.*;
-import java.net.InetAddress;
-
-import org.apache.cassandra.concurrent.SingleThreadedStage;
-import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.net.EndPoint;
-import org.apache.cassandra.net.IVerbHandler;
-import org.apache.cassandra.net.Message;
-import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.utils.LogUtil;
-import org.apache.log4j.Logger;
-
-/**
- * This module is responsible for Gossiping information for the local endpoint. This abstraction
- * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
- * chooses a random node and initiates a round of Gossip with it. A round of Gossip involves 3
- * rounds of messaging. For instance if node A wants to initiate a round of Gossip with node B
- * it starts off by sending node B a GossipDigestSynMessage. Node B on receipt of this message
- * sends node A a GossipDigestAckMessage. On receipt of this message node A sends node B a
- * GossipDigestAck2Message which completes a round of Gossip. This module as and when it hears one
- * of the three above mentioned messages updates the Failure Detector with the liveness information.
- *
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class Gossiper implements IFailureDetectionEventListener, IEndPointStateChangePublisher
-{
- private class GossipTimerTask extends TimerTask
- {
- public void run()
- {
- try
- {
- synchronized( Gossiper.instance() )
- {
- /* Update the local heartbeat counter. */
- endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat();
- List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
- Gossiper.instance().makeRandomGossipDigest(gDigests);
-
- if ( gDigests.size() > 0 )
- {
- Message message = makeGossipDigestSynMessage(gDigests);
- /* Gossip to some random live member */
- boolean bVal = doGossipToLiveMember(message);
-
- /* Gossip to some unreachable member with some probability to check if he is back up */
- doGossipToUnreachableMember(message);
-
- /* Gossip to the seed. */
- if ( !bVal )
- doGossipToSeed(message);
-
- if (logger_.isTraceEnabled())
- logger_.trace("Performing status check ...");
- doStatusCheck();
- }
- }
- }
- catch ( Throwable th )
- {
- logger_.info( LogUtil.throwableToString(th) );
- }
- }
- }
-
- final static int MAX_GOSSIP_PACKET_SIZE = 1428;
- /* GS - abbreviation for GOSSIPER_STAGE */
- final static String GOSSIP_STAGE = "GS";
- /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
- final static String JOIN_VERB_HANDLER = "JVH";
- /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
- final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
- /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
- final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
- /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
- final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
- final static int intervalInMillis_ = 1000;
- private static Logger logger_ = Logger.getLogger(Gossiper.class);
- static Gossiper gossiper_;
-
- public synchronized static Gossiper instance()
- {
- if ( gossiper_ == null )
- {
- gossiper_ = new Gossiper();
- }
- return gossiper_;
- }
-
- private Timer gossipTimer_ = new Timer(false);
- private EndPoint localEndPoint_;
- private long aVeryLongTime_;
- private Random random_ = new Random();
- /* round robin index through live endpoint set */
- private int rrIndex_ = 0;
-
- /* subscribers for interest in EndPointState change */
- private List<IEndPointStateChangeSubscriber> subscribers_ = new ArrayList<IEndPointStateChangeSubscriber>();
-
- /* live member set */
- private Set<EndPoint> liveEndpoints_ = new HashSet<EndPoint>();
-
- /* unreachable member set */
- private Set<EndPoint> unreachableEndpoints_ = new HashSet<EndPoint>();
-
- /* initial seeds for joining the cluster */
- private Set<EndPoint> seeds_ = new HashSet<EndPoint>();
-
- /* map where key is the endpoint and value is the state associated with the endpoint */
- Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
-
- /* private CTOR */
- Gossiper()
- {
- aVeryLongTime_ = 259200 * 1000;
- /* register with the Failure Detector for receiving Failure detector events */
- FailureDetector.instance().registerFailureDetectionEventListener(this);
- /* register the verbs */
- MessagingService.getMessagingInstance().registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
- MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
- MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
- MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
- /* register the Gossip stage */
- StageManager.registerStage( Gossiper.GOSSIP_STAGE, new SingleThreadedStage("GMFD") );
- }
-
- public void register(IEndPointStateChangeSubscriber subscriber)
- {
- subscribers_.add(subscriber);
- }
-
- public void unregister(IEndPointStateChangeSubscriber subscriber)
- {
- subscribers_.remove(subscriber);
- }
-
- public Set<EndPoint> getAllMembers()
- {
- Set<EndPoint> allMbrs = new HashSet<EndPoint>();
- allMbrs.addAll(getLiveMembers());
- allMbrs.addAll(getUnreachableMembers());
- return allMbrs;
- }
-
- public Set<EndPoint> getLiveMembers()
- {
- Set<EndPoint> liveMbrs = new HashSet<EndPoint>(liveEndpoints_);
- liveMbrs.add( new EndPoint( localEndPoint_.getHost(), localEndPoint_.getPort() ) );
- return liveMbrs;
- }
-
- public Set<EndPoint> getUnreachableMembers()
- {
- return new HashSet<EndPoint>(unreachableEndpoints_);
- }
-
- /**
- * This method is used to forcibly remove a node from the membership
- * set. He is forgotten locally immediately.
- *
- * param@ ep the endpoint to be removed from membership.
- */
- public synchronized void removeFromMembership(EndPoint ep)
- {
- endPointStateMap_.remove(ep);
- liveEndpoints_.remove(ep);
- unreachableEndpoints_ .remove(ep);
- }
-
- /**
- * This method is part of IFailureDetectionEventListener interface. This is invoked
- * by the Failure Detector when it convicts an end point.
- *
- * param @ endpoint end point that is convicted.
- */
-
- public void convict(EndPoint endpoint)
- {
- EndPointState epState = endPointStateMap_.get(endpoint);
- if ( epState != null )
- {
- if ( !epState.isAlive() && epState.isAGossiper() )
- {
- /*
- * just to be sure - is invoked just to make sure that
- * it was called at least once.
- */
- if ( liveEndpoints_.contains(endpoint) )
- {
- logger_.info("EndPoint " + endpoint + " is now dead.");
- isAlive(endpoint, epState, false);
-
- /* Notify an endpoint is dead to interested parties. */
- EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
- doNotifications(endpoint, deltaState);
- }
- epState.isAGossiper(false);
- }
- }
- }
-
- /**
- * This method is part of IFailureDetectionEventListener interface. This is invoked
- * by the Failure Detector when it suspects an end point.
- *
- * param @ endpoint end point that is suspected.
- */
- public void suspect(EndPoint endpoint)
- {
- EndPointState epState = endPointStateMap_.get(endpoint);
- if ( epState.isAlive() )
- {
- logger_.info("EndPoint " + endpoint + " is now dead.");
- isAlive(endpoint, epState, false);
-
- /* Notify an endpoint is dead to interested parties. */
- EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
- doNotifications(endpoint, deltaState);
- }
- }
-
- int getMaxEndPointStateVersion(EndPointState epState)
- {
- List<Integer> versions = new ArrayList<Integer>();
- versions.add( epState.getHeartBeatState().getHeartBeatVersion() );
- Map<String, ApplicationState> appStateMap = epState.getApplicationState();
-
- Set<String> keys = appStateMap.keySet();
- for ( String key : keys )
- {
- int stateVersion = appStateMap.get(key).getStateVersion();
- versions.add( stateVersion );
- }
-
- /* sort to get the max version to build GossipDigest for this endpoint */
- Collections.sort(versions);
- int maxVersion = versions.get(versions.size() - 1);
- versions.clear();
- return maxVersion;
- }
-
- /**
- * Removes the endpoint from unreachable endpoint set
- *
- * @param endpoint endpoint to be removed from the current membership.
- */
- void evictFromMembership(EndPoint endpoint)
- {
- unreachableEndpoints_.remove(endpoint);
- }
-
- /* No locking required since it is called from a method that already has acquired a lock */
- @Deprecated
- void makeGossipDigest(List<GossipDigest> gDigests)
- {
- /* Add the local endpoint state */
- EndPointState epState = endPointStateMap_.get(localEndPoint_);
- int generation = epState.getHeartBeatState().getGeneration();
- int maxVersion = getMaxEndPointStateVersion(epState);
- gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
-
- for ( EndPoint liveEndPoint : liveEndpoints_ )
- {
- epState = endPointStateMap_.get(liveEndPoint);
- if ( epState != null )
- {
- generation = epState.getHeartBeatState().getGeneration();
- maxVersion = getMaxEndPointStateVersion(epState);
- gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
- }
- else
- {
- gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
- }
- }
- }
-
- /**
- * No locking required since it is called from a method that already
- * has acquired a lock. The gossip digest is built based on randomization
- * rather than just looping through the collection of live endpoints.
- *
- * @param gDigests list of Gossip Digests.
- */
- void makeRandomGossipDigest(List<GossipDigest> gDigests)
- {
- /* Add the local endpoint state */
- EndPointState epState = endPointStateMap_.get(localEndPoint_);
- int generation = epState.getHeartBeatState().getGeneration();
- int maxVersion = getMaxEndPointStateVersion(epState);
- gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
-
- List<EndPoint> endpoints = new ArrayList<EndPoint>( liveEndpoints_ );
- Collections.shuffle(endpoints, random_);
- for ( EndPoint liveEndPoint : endpoints )
- {
- epState = endPointStateMap_.get(liveEndPoint);
- if ( epState != null )
- {
- generation = epState.getHeartBeatState().getGeneration();
- maxVersion = getMaxEndPointStateVersion(epState);
- gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
- }
- else
- {
- gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
- }
- }
-
- /* FOR DEBUG ONLY - remove later */
- StringBuilder sb = new StringBuilder();
- for ( GossipDigest gDigest : gDigests )
- {
- sb.append(gDigest);
- sb.append(" ");
- }
- if (logger_.isTraceEnabled())
- logger_.trace("Gossip Digests are : " + sb.toString());
- }
-
- public int getCurrentGenerationNumber(EndPoint endpoint)
- {
- return endPointStateMap_.get(endpoint).getHeartBeatState().getGeneration();
- }
-
- Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
- {
- GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
- DataOutputStream dos = new DataOutputStream( bos );
- GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
- Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
- return message;
- }
-
- Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
- DataOutputStream dos = new DataOutputStream(bos);
- GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
- if (logger_.isTraceEnabled())
- logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
- Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
- return message;
- }
-
- Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
- DataOutputStream dos = new DataOutputStream(bos);
- GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
- Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
- return message;
- }
-
- boolean sendGossipToLiveNode(Message message)
- {
- int size = liveEndpoints_.size();
- List<EndPoint> eps = new ArrayList<EndPoint>(liveEndpoints_);
-
- if ( rrIndex_ >= size )
- {
- rrIndex_ = -1;
- }
-
- EndPoint to = eps.get(++rrIndex_);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
- MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
- return seeds_.contains(to);
- }
-
- /**
- * Returns true if the chosen target was also a seed. False otherwise
- *
- * @param message message to sent
- * @param epSet a set of endpoint from which a random endpoint is chosen.
- * @return true if the chosen endpoint is also a seed.
- */
- boolean sendGossip(Message message, Set<EndPoint> epSet)
- {
- int size = epSet.size();
- /* Generate a random number from 0 -> size */
- List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
- int index = (size == 1) ? 0 : random_.nextInt(size);
- EndPoint to = liveEndPoints.get(index);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
- MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
- return seeds_.contains(to);
- }
-
- /* Sends a Gossip message to a live member and returns a reference to the member */
- boolean doGossipToLiveMember(Message message)
- {
- int size = liveEndpoints_.size();
- if ( size == 0 )
- return false;
- // return sendGossipToLiveNode(message);
- /* Use this for a cluster size >= 30 */
- return sendGossip(message, liveEndpoints_);
- }
-
- /* Sends a Gossip message to an unreachable member */
- void doGossipToUnreachableMember(Message message)
- {
- double liveEndPoints = liveEndpoints_.size();
- double unreachableEndPoints = unreachableEndpoints_.size();
- if ( unreachableEndPoints > 0 )
- {
- /* based on some probability */
- double prob = unreachableEndPoints / (liveEndPoints + 1);
- double randDbl = random_.nextDouble();
- if ( randDbl < prob )
- sendGossip(message, unreachableEndpoints_);
- }
- }
-
- /* Gossip to a seed for facilitating partition healing */
- void doGossipToSeed(Message message)
- {
- int size = seeds_.size();
- if ( size > 0 )
- {
- if ( size == 1 && seeds_.contains(localEndPoint_) )
- {
- return;
- }
-
- if ( liveEndpoints_.size() == 0 )
- {
- sendGossip(message, seeds_);
- }
- else
- {
- /* Gossip with the seed with some probability. */
- double probability = seeds_.size() / ( liveEndpoints_.size() + unreachableEndpoints_.size() );
- double randDbl = random_.nextDouble();
- if ( randDbl <= probability )
- sendGossip(message, seeds_);
- }
- }
- }
-
- void doStatusCheck()
- {
- Set<EndPoint> eps = endPointStateMap_.keySet();
-
- for ( EndPoint endpoint : eps )
- {
- if ( endpoint.equals(localEndPoint_) )
- continue;
-
- FailureDetector.instance().interpret(endpoint);
- EndPointState epState = endPointStateMap_.get(endpoint);
- if ( epState != null )
- {
- long duration = System.currentTimeMillis() - epState.getUpdateTimestamp();
- if ( !epState.isAlive() && (duration > aVeryLongTime_) )
- {
- evictFromMembership(endpoint);
- }
- }
- }
- }
-
- EndPointState getEndPointStateForEndPoint(EndPoint ep)
- {
- return endPointStateMap_.get(ep);
- }
-
- synchronized EndPointState getStateForVersionBiggerThan(EndPoint forEndpoint, int version)
- {
- EndPointState epState = endPointStateMap_.get(forEndpoint);
- EndPointState reqdEndPointState = null;
-
- if ( epState != null )
- {
- /*
- * Here we try to include the Heart Beat state only if it is
- * greater than the version passed in. It might happen that
- * the heart beat version maybe lesser than the version passed
- * in and some application state has a version that is greater
- * than the version passed in. In this case we also send the old
- * heart beat and throw it away on the receiver if it is redundant.
- */
- int localHbVersion = epState.getHeartBeatState().getHeartBeatVersion();
- if ( localHbVersion > version )
- {
- reqdEndPointState = new EndPointState(epState.getHeartBeatState());
- }
- Map<String, ApplicationState> appStateMap = epState.getApplicationState();
- /* Accumulate all application states whose versions are greater than "version" variable */
- Set<String> keys = appStateMap.keySet();
- for ( String key : keys )
- {
- ApplicationState appState = appStateMap.get(key);
- if ( appState.getStateVersion() > version )
- {
- if ( reqdEndPointState == null )
- {
- reqdEndPointState = new EndPointState(epState.getHeartBeatState());
- }
- reqdEndPointState.addApplicationState(key, appState);
- }
- }
- }
- return reqdEndPointState;
- }
-
- /*
- * This method is called only from the JoinVerbHandler. This happens
- * when a new node coming up multicasts the JoinMessage. Here we need
- * to add the endPoint to the list of live endpoints.
- */
- synchronized void join(EndPoint from)
- {
- if ( !from.equals( localEndPoint_ ) )
- {
- /* Mark this endpoint as "live" */
- liveEndpoints_.add(from);
- unreachableEndpoints_.remove(from);
- }
- }
-
- void notifyFailureDetector(List<GossipDigest> gDigests)
- {
- IFailureDetector fd = FailureDetector.instance();
- for ( GossipDigest gDigest : gDigests )
- {
- EndPointState localEndPointState = endPointStateMap_.get(gDigest.endPoint_);
- /*
- * If the local endpoint state exists then report to the FD only
- * if the versions workout.
- */
- if ( localEndPointState != null )
- {
- int localGeneration = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().generation_;
- int remoteGeneration = gDigest.generation_;
- if ( remoteGeneration > localGeneration )
- {
- fd.report(gDigest.endPoint_);
- continue;
- }
-
- if ( remoteGeneration == localGeneration )
- {
- int localVersion = getMaxEndPointStateVersion(localEndPointState);
- //int localVersion = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().getHeartBeatVersion();
- int remoteVersion = gDigest.maxVersion_;
- if ( remoteVersion > localVersion )
- {
- fd.report(gDigest.endPoint_);
- }
- }
- }
- }
- }
-
- void notifyFailureDetector(Map<EndPoint, EndPointState> remoteEpStateMap)
- {
- IFailureDetector fd = FailureDetector.instance();
- Set<EndPoint> endpoints = remoteEpStateMap.keySet();
- for ( EndPoint endpoint : endpoints )
- {
- EndPointState remoteEndPointState = remoteEpStateMap.get(endpoint);
- EndPointState localEndPointState = endPointStateMap_.get(endpoint);
- /*
- * If the local endpoint state exists then report to the FD only
- * if the versions workout.
- */
- if ( localEndPointState != null )
- {
- int localGeneration = localEndPointState.getHeartBeatState().generation_;
- int remoteGeneration = remoteEndPointState.getHeartBeatState().generation_;
- if ( remoteGeneration > localGeneration )
- {
- fd.report(endpoint);
- continue;
- }
-
- if ( remoteGeneration == localGeneration )
- {
- int localVersion = getMaxEndPointStateVersion(localEndPointState);
- //int localVersion = localEndPointState.getHeartBeatState().getHeartBeatVersion();
- int remoteVersion = remoteEndPointState.getHeartBeatState().getHeartBeatVersion();
- if ( remoteVersion > localVersion )
- {
- fd.report(endpoint);
- }
- }
- }
- }
- }
-
- void markAlive(EndPoint addr, EndPointState localState)
- {
- if (logger_.isTraceEnabled())
- logger_.trace("marking as alive " + addr);
- if ( !localState.isAlive() )
- {
- isAlive(addr, localState, true);
- logger_.info("EndPoint " + addr + " is now UP");
- }
- }
-
- private void handleNewJoin(EndPoint ep, EndPointState epState)
- {
- logger_.info("Node " + ep + " has now joined.");
- /* Mark this endpoint as "live" */
- endPointStateMap_.put(ep, epState);
- isAlive(ep, epState, true);
- /* Notify interested parties about endpoint state change */
- doNotifications(ep, epState);
- }
-
- synchronized void applyStateLocally(Map<EndPoint, EndPointState> epStateMap)
- {
- Set<EndPoint> eps = epStateMap.keySet();
- for( EndPoint ep : eps )
- {
- if ( ep.equals( localEndPoint_ ) )
- continue;
-
- EndPointState localEpStatePtr = endPointStateMap_.get(ep);
- EndPointState remoteState = epStateMap.get(ep);
- /*
- If state does not exist just add it. If it does then add it only if the version
- of the remote copy is greater than the local copy.
- */
- if ( localEpStatePtr != null )
- {
- int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration();
- int remoteGeneration = remoteState.getHeartBeatState().getGeneration();
-
- if (remoteGeneration > localGeneration)
- {
- handleNewJoin(ep, remoteState);
- }
- else if ( remoteGeneration == localGeneration )
- {
- /* manage the membership state */
- int localMaxVersion = getMaxEndPointStateVersion(localEpStatePtr);
- int remoteMaxVersion = getMaxEndPointStateVersion(remoteState);
- if ( remoteMaxVersion > localMaxVersion )
- {
- markAlive(ep, localEpStatePtr);
- applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState);
- /* apply ApplicationState */
- applyApplicationStateLocally(ep, localEpStatePtr, remoteState);
- }
- }
- }
- else
- {
- handleNewJoin(ep, remoteState);
- }
- }
- }
-
- void applyHeartBeatStateLocally(EndPoint addr, EndPointState localState, EndPointState remoteState)
- {
- HeartBeatState localHbState = localState.getHeartBeatState();
- HeartBeatState remoteHbState = remoteState.getHeartBeatState();
-
- if ( remoteHbState.getGeneration() > localHbState.getGeneration() )
- {
- markAlive(addr, localState);
- localState.setHeartBeatState(remoteHbState);
- }
- if ( localHbState.getGeneration() == remoteHbState.getGeneration() )
- {
- if ( remoteHbState.getHeartBeatVersion() > localHbState.getHeartBeatVersion() )
- {
- int oldVersion = localHbState.getHeartBeatVersion();
- localState.setHeartBeatState(remoteHbState);
- if (logger_.isTraceEnabled())
- logger_.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
- }
- }
- }
-
- void applyApplicationStateLocally(EndPoint addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
- {
- Map<String, ApplicationState> localAppStateMap = localStatePtr.getApplicationState();
- Map<String, ApplicationState> remoteAppStateMap = remoteStatePtr.getApplicationState();
-
- Set<String> remoteKeys = remoteAppStateMap.keySet();
- for ( String remoteKey : remoteKeys )
- {
- ApplicationState remoteAppState = remoteAppStateMap.get(remoteKey);
- ApplicationState localAppState = localAppStateMap.get(remoteKey);
-
- /* If state doesn't exist locally for this key then just apply it */
- if ( localAppState == null )
- {
- localStatePtr.addApplicationState(remoteKey, remoteAppState);
- /* notify interested parties of endpoint state change */
- EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
- deltaState.addApplicationState(remoteKey, remoteAppState);
- doNotifications(addr, deltaState);
- continue;
- }
-
- int remoteGeneration = remoteStatePtr.getHeartBeatState().getGeneration();
- int localGeneration = localStatePtr.getHeartBeatState().getGeneration();
-
- /* If the remoteGeneration is greater than localGeneration then apply state blindly */
- if ( remoteGeneration > localGeneration )
- {
- localStatePtr.addApplicationState(remoteKey, remoteAppState);
- /* notify interested parties of endpoint state change */
- EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
- deltaState.addApplicationState(remoteKey, remoteAppState);
- doNotifications(addr, deltaState);
- continue;
- }
-
- /* If the generations are the same then apply state if the remote version is greater than local version. */
- if ( remoteGeneration == localGeneration )
- {
- int remoteVersion = remoteAppState.getStateVersion();
- int localVersion = localAppState.getStateVersion();
-
- if ( remoteVersion > localVersion )
- {
- localStatePtr.addApplicationState(remoteKey, remoteAppState);
- /* notify interested parties of endpoint state change */
- EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
- deltaState.addApplicationState(remoteKey, remoteAppState);
- doNotifications(addr, deltaState);
- }
- }
- }
- }
-
- void doNotifications(EndPoint addr, EndPointState epState)
- {
- for ( IEndPointStateChangeSubscriber subscriber : subscribers_ )
- {
- subscriber.onChange(addr, epState);
- }
- }
-
- synchronized void isAlive(EndPoint addr, EndPointState epState, boolean value)
- {
- epState.isAlive(value);
- if ( value )
- {
- liveEndpoints_.add(addr);
- unreachableEndpoints_.remove(addr);
- }
- else
- {
- liveEndpoints_.remove(addr);
- unreachableEndpoints_.add(addr);
- }
- if ( epState.isAGossiper() )
- return;
- epState.isAGossiper(true);
- }
-
- /* These are helper methods used from GossipDigestSynVerbHandler */
- Map<EndPoint, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
- {
- Map<EndPoint, GossipDigest> epMap = new HashMap<EndPoint, GossipDigest>();
- for( GossipDigest gDigest : gDigestList )
- {
- epMap.put( gDigest.getEndPoint(), gDigest );
- }
- return epMap;
- }
-
- /* This is a helper method to get all EndPoints from a list of GossipDigests */
- EndPoint[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
- {
- Set<EndPoint> set = new HashSet<EndPoint>();
- for ( GossipDigest gDigest : gDigestList )
- {
- set.add( gDigest.getEndPoint() );
- }
- return set.toArray( new EndPoint[0] );
- }
-
- /* Request all the state for the endpoint in the gDigest */
- void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
- {
- /* We are here since we have no data for this endpoint locally so request everthing. */
- deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) );
- }
-
- /* Send all the data with version greater than maxRemoteVersion */
- void sendAll(GossipDigest gDigest, Map<EndPoint, EndPointState> deltaEpStateMap, int maxRemoteVersion)
- {
- EndPointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndPoint(), maxRemoteVersion) ;
- if ( localEpStatePtr != null )
- deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr);
- }
-
- /*
- This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
- and the delta state are built up.
- */
- synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<EndPoint, EndPointState> deltaEpStateMap)
- {
- for ( GossipDigest gDigest : gDigestList )
- {
- int remoteGeneration = gDigest.getGeneration();
- int maxRemoteVersion = gDigest.getMaxVersion();
- /* Get state associated with the end point in digest */
- EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint());
- /*
- Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
- then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
- request all the data for this endpoint.
- */
- if ( epStatePtr != null )
- {
- int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
- /* get the max version of all keys in the state associated with this endpoint */
- int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr);
- if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
- continue;
-
- if ( remoteGeneration > localGeneration )
- {
- /* we request everything from the gossiper */
- requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
- }
- if ( remoteGeneration < localGeneration )
- {
- /* send all data with generation = localgeneration and version > 0 */
- sendAll(gDigest, deltaEpStateMap, 0);
- }
- if ( remoteGeneration == localGeneration )
- {
- /*
- If the max remote version is greater then we request the remote endpoint send us all the data
- for this endpoint with version greater than the max version number we have locally for this
- endpoint.
- If the max remote version is lesser, then we send all the data we have locally for this endpoint
- with version greater than the max remote version.
- */
- if ( maxRemoteVersion > maxLocalVersion )
- {
- deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) );
- }
- if ( maxRemoteVersion < maxLocalVersion )
- {
- /* send all data with generation = localgeneration and version > maxRemoteVersion */
- sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
- }
- }
- }
- else
- {
- /* We are here since we have no data for this endpoint locally so request everything. */
- requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
- }
- }
- }
-
- public void start(EndPoint localEndPoint, int generationNbr) throws IOException
- {
- localEndPoint_ = localEndPoint;
- /* Get the seeds from the config and initialize them. */
- Set<String> seedHosts = DatabaseDescriptor.getSeeds();
- for( String seedHost : seedHosts )
- {
- EndPoint seed = new EndPoint(InetAddress.getByName(seedHost).getHostAddress(),
- DatabaseDescriptor.getControlPort());
- if ( seed.equals(localEndPoint) )
- continue;
- seeds_.add(seed);
- }
-
- /* initialize the heartbeat state for this localEndPoint */
- EndPointState localState = endPointStateMap_.get(localEndPoint_);
- if ( localState == null )
- {
- HeartBeatState hbState = new HeartBeatState(generationNbr, 0);
- localState = new EndPointState(hbState);
- localState.isAlive(true);
- localState.isAGossiper(true);
- endPointStateMap_.put(localEndPoint_, localState);
- }
-
- /* starts a timer thread */
- gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
- }
-
- public synchronized void addApplicationState(String key, ApplicationState appState)
- {
- EndPointState epState = endPointStateMap_.get(localEndPoint_);
- if ( epState != null )
- {
- epState.addApplicationState(key, appState);
- }
- }
-
- public void stop()
- {
- gossipTimer_.cancel();
- }
-}
-
-class JoinVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
-
- public void doVerb(Message message)
- {
- EndPoint from = message.getFrom();
- if (logger_.isDebugEnabled())
- logger_.debug("Received a JoinMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
- JoinMessage joinMessage = null;
- try
- {
- joinMessage = JoinMessage.serializer().deserialize(dis);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
- {
- Gossiper.instance().join(from);
- }
- }
-}
-
-class GossipDigestSynVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
-
- public void doVerb(Message message)
- {
- EndPoint from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestSynMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
- try
- {
- GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
- /* If the message is from a different cluster throw it away. */
- if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
- return;
-
- List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
- /* Notify the Failure Detector */
- Gossiper.instance().notifyFailureDetector(gDigestList);
-
- doSort(gDigestList);
-
- List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
- Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
- Gossiper.instance().examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
-
- GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
- Message gDigestAckMessage = Gossiper.instance().makeGossipDigestAckMessage(gDigestAck);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestAckMessage to " + from);
- MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAckMessage, from);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
-
- /*
- * First construct a map whose key is the endpoint in the GossipDigest and the value is the
- * GossipDigest itself. Then build a list of version differences i.e difference between the
- * version in the GossipDigest and the version in the local state for a given EndPoint.
- * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
- * to the endpoint from the map that was initially constructed.
- */
- private void doSort(List<GossipDigest> gDigestList)
- {
- /* Construct a map of endpoint to GossipDigest. */
- Map<EndPoint, GossipDigest> epToDigestMap = new HashMap<EndPoint, GossipDigest>();
- for ( GossipDigest gDigest : gDigestList )
- {
- epToDigestMap.put(gDigest.getEndPoint(), gDigest);
- }
-
- /*
- * These digests have their maxVersion set to the difference of the version
- * of the local EndPointState and the version found in the GossipDigest.
- */
- List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
- for ( GossipDigest gDigest : gDigestList )
- {
- EndPoint ep = gDigest.getEndPoint();
- EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep);
- int version = (epState != null) ? Gossiper.instance().getMaxEndPointStateVersion( epState ) : 0;
- int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
- diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
- }
-
- gDigestList.clear();
- Collections.sort(diffDigests);
- int size = diffDigests.size();
- /*
- * Report the digests in descending order. This takes care of the endpoints
- * that are far behind w.r.t this local endpoint
- */
- for ( int i = size - 1; i >= 0; --i )
- {
- gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
- }
- }
-}
-
-class GossipDigestAckVerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
-
- public void doVerb(Message message)
- {
- EndPoint from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestAckMessage from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
-
- try
- {
- GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
- List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
- Map<EndPoint, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
-
- if ( epStateMap.size() > 0 )
- {
- /* Notify the Failure Detector */
- Gossiper.instance().notifyFailureDetector(epStateMap);
- Gossiper.instance().applyStateLocally(epStateMap);
- }
-
- /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
- Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
- for( GossipDigest gDigest : gDigestList )
- {
- EndPoint addr = gDigest.getEndPoint();
- EndPointState localEpStatePtr = Gossiper.instance().getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
- if ( localEpStatePtr != null )
- deltaEpStateMap.put(addr, localEpStatePtr);
- }
-
- GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
- Message gDigestAck2Message = Gossiper.instance().makeGossipDigestAck2Message(gDigestAck2);
- if (logger_.isTraceEnabled())
- logger_.trace("Sending a GossipDigestAck2Message to " + from);
- MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAck2Message, from);
- }
- catch ( IOException e )
- {
- throw new RuntimeException(e);
- }
- }
-}
-
-class GossipDigestAck2VerbHandler implements IVerbHandler
-{
- private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
-
- public void doVerb(Message message)
- {
- EndPoint from = message.getFrom();
- if (logger_.isTraceEnabled())
- logger_.trace("Received a GossipDigestAck2Message from " + from);
-
- byte[] bytes = message.getMessageBody();
- DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
- GossipDigestAck2Message gDigestAck2Message = null;
- try
- {
- gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
- /* Notify the Failure Detector */
- Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
- Gossiper.instance().applyStateLocally(remoteEpStateMap);
- }
-}
-
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.*;
+import java.util.*;
+import java.net.InetAddress;
+
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * This module is responsible for Gossiping information for the local endpoint. This abstraction
+ * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
+ * chooses a random node and initiates a round of Gossip with it. A round of Gossip involves 3
+ * rounds of messaging. For instance if node A wants to initiate a round of Gossip with node B
+ * it starts off by sending node B a GossipDigestSynMessage. Node B on receipt of this message
+ * sends node A a GossipDigestAckMessage. On receipt of this message node A sends node B a
+ * GossipDigestAck2Message which completes a round of Gossip. This module as and when it hears one
+ * of the three above mentioned messages updates the Failure Detector with the liveness information.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Gossiper implements IFailureDetectionEventListener, IEndPointStateChangePublisher
+{
+ private class GossipTimerTask extends TimerTask
+ {
+ public void run()
+ {
+ try
+ {
+ synchronized( Gossiper.instance() )
+ {
+ /* Update the local heartbeat counter. */
+ endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ Gossiper.instance().makeRandomGossipDigest(gDigests);
+
+ if ( gDigests.size() > 0 )
+ {
+ Message message = makeGossipDigestSynMessage(gDigests);
+ /* Gossip to some random live member */
+ boolean bVal = doGossipToLiveMember(message);
+
+ /* Gossip to some unreachable member with some probability to check if he is back up */
+ doGossipToUnreachableMember(message);
+
+ /* Gossip to the seed. */
+ if ( !bVal )
+ doGossipToSeed(message);
+
+ if (logger_.isTraceEnabled())
+ logger_.trace("Performing status check ...");
+ doStatusCheck();
+ }
+ }
+ }
+ catch ( Throwable th )
+ {
+ logger_.info( LogUtil.throwableToString(th) );
+ }
+ }
+ }
+
+ final static int MAX_GOSSIP_PACKET_SIZE = 1428;
+ /* GS - abbreviation for GOSSIPER_STAGE */
+ final static String GOSSIP_STAGE = "GS";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ final static String JOIN_VERB_HANDLER = "JVH";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
+ /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
+ final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
+ /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
+ final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
+ final static int intervalInMillis_ = 1000;
+ private static Logger logger_ = Logger.getLogger(Gossiper.class);
+ static Gossiper gossiper_;
+
+ public synchronized static Gossiper instance()
+ {
+ if ( gossiper_ == null )
+ {
+ gossiper_ = new Gossiper();
+ }
+ return gossiper_;
+ }
+
+ private Timer gossipTimer_ = new Timer(false);
+ private EndPoint localEndPoint_;
+ private long aVeryLongTime_;
+ private Random random_ = new Random();
+ /* round robin index through live endpoint set */
+ private int rrIndex_ = 0;
+
+ /* subscribers for interest in EndPointState change */
+ private List<IEndPointStateChangeSubscriber> subscribers_ = new ArrayList<IEndPointStateChangeSubscriber>();
+
+ /* live member set */
+ private Set<EndPoint> liveEndpoints_ = new HashSet<EndPoint>();
+
+ /* unreachable member set */
+ private Set<EndPoint> unreachableEndpoints_ = new HashSet<EndPoint>();
+
+ /* initial seeds for joining the cluster */
+ private Set<EndPoint> seeds_ = new HashSet<EndPoint>();
+
+ /* map where key is the endpoint and value is the state associated with the endpoint */
+ Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
+
+ /* private CTOR */
+ Gossiper()
+ {
+ aVeryLongTime_ = 259200 * 1000;
+ /* register with the Failure Detector for receiving Failure detector events */
+ FailureDetector.instance().registerFailureDetectionEventListener(this);
+ /* register the verbs */
+ MessagingService.getMessagingInstance().registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
+ /* register the Gossip stage */
+ StageManager.registerStage( Gossiper.GOSSIP_STAGE, new SingleThreadedStage("GMFD") );
+ }
+
+ public void register(IEndPointStateChangeSubscriber subscriber)
+ {
+ subscribers_.add(subscriber);
+ }
+
+ public void unregister(IEndPointStateChangeSubscriber subscriber)
+ {
+ subscribers_.remove(subscriber);
+ }
+
+ public Set<EndPoint> getAllMembers()
+ {
+ Set<EndPoint> allMbrs = new HashSet<EndPoint>();
+ allMbrs.addAll(getLiveMembers());
+ allMbrs.addAll(getUnreachableMembers());
+ return allMbrs;
+ }
+
+ public Set<EndPoint> getLiveMembers()
+ {
+ Set<EndPoint> liveMbrs = new HashSet<EndPoint>(liveEndpoints_);
+ liveMbrs.add( new EndPoint( localEndPoint_.getHost(), localEndPoint_.getPort() ) );
+ return liveMbrs;
+ }
+
+ public Set<EndPoint> getUnreachableMembers()
+ {
+ return new HashSet<EndPoint>(unreachableEndpoints_);
+ }
+
+ /**
+ * This method is used to forcibly remove a node from the membership
+ * set. He is forgotten locally immediately.
+ *
+ * param@ ep the endpoint to be removed from membership.
+ */
+ public synchronized void removeFromMembership(EndPoint ep)
+ {
+ endPointStateMap_.remove(ep);
+ liveEndpoints_.remove(ep);
+ unreachableEndpoints_ .remove(ep);
+ }
+
+ /**
+ * This method is part of IFailureDetectionEventListener interface. This is invoked
+ * by the Failure Detector when it convicts an end point.
+ *
+ * param @ endpoint end point that is convicted.
+ */
+
+ public void convict(EndPoint endpoint)
+ {
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState != null )
+ {
+ if ( !epState.isAlive() && epState.isAGossiper() )
+ {
+ /*
+ * just to be sure - is invoked just to make sure that
+ * it was called at least once.
+ */
+ if ( liveEndpoints_.contains(endpoint) )
+ {
+ logger_.info("EndPoint " + endpoint + " is now dead.");
+ isAlive(endpoint, epState, false);
+
+ /* Notify an endpoint is dead to interested parties. */
+ EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
+ doNotifications(endpoint, deltaState);
+ }
+ epState.isAGossiper(false);
+ }
+ }
+ }
+
+ /**
+ * This method is part of IFailureDetectionEventListener interface. This is invoked
+ * by the Failure Detector when it suspects an end point.
+ *
+ * param @ endpoint end point that is suspected.
+ */
+ public void suspect(EndPoint endpoint)
+ {
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState.isAlive() )
+ {
+ logger_.info("EndPoint " + endpoint + " is now dead.");
+ isAlive(endpoint, epState, false);
+
+ /* Notify an endpoint is dead to interested parties. */
+ EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
+ doNotifications(endpoint, deltaState);
+ }
+ }
+
+ int getMaxEndPointStateVersion(EndPointState epState)
+ {
+ List<Integer> versions = new ArrayList<Integer>();
+ versions.add( epState.getHeartBeatState().getHeartBeatVersion() );
+ Map<String, ApplicationState> appStateMap = epState.getApplicationState();
+
+ Set<String> keys = appStateMap.keySet();
+ for ( String key : keys )
+ {
+ int stateVersion = appStateMap.get(key).getStateVersion();
+ versions.add( stateVersion );
+ }
+
+ /* sort to get the max version to build GossipDigest for this endpoint */
+ Collections.sort(versions);
+ int maxVersion = versions.get(versions.size() - 1);
+ versions.clear();
+ return maxVersion;
+ }
+
+ /**
+ * Removes the endpoint from unreachable endpoint set
+ *
+ * @param endpoint endpoint to be removed from the current membership.
+ */
+ void evictFromMembership(EndPoint endpoint)
+ {
+ unreachableEndpoints_.remove(endpoint);
+ }
+
+ /* No locking required since it is called from a method that already has acquired a lock */
+ @Deprecated
+ void makeGossipDigest(List<GossipDigest> gDigests)
+ {
+ /* Add the local endpoint state */
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ int generation = epState.getHeartBeatState().getGeneration();
+ int maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
+
+ for ( EndPoint liveEndPoint : liveEndpoints_ )
+ {
+ epState = endPointStateMap_.get(liveEndPoint);
+ if ( epState != null )
+ {
+ generation = epState.getHeartBeatState().getGeneration();
+ maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
+ }
+ else
+ {
+ gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
+ }
+ }
+ }
+
+ /**
+ * No locking required since it is called from a method that already
+ * has acquired a lock. The gossip digest is built based on randomization
+ * rather than just looping through the collection of live endpoints.
+ *
+ * @param gDigests list of Gossip Digests.
+ */
+ void makeRandomGossipDigest(List<GossipDigest> gDigests)
+ {
+ /* Add the local endpoint state */
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ int generation = epState.getHeartBeatState().getGeneration();
+ int maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
+
+ List<EndPoint> endpoints = new ArrayList<EndPoint>( liveEndpoints_ );
+ Collections.shuffle(endpoints, random_);
+ for ( EndPoint liveEndPoint : endpoints )
+ {
+ epState = endPointStateMap_.get(liveEndPoint);
+ if ( epState != null )
+ {
+ generation = epState.getHeartBeatState().getGeneration();
+ maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
+ }
+ else
+ {
+ gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
+ }
+ }
+
+ /* FOR DEBUG ONLY - remove later */
+ StringBuilder sb = new StringBuilder();
+ for ( GossipDigest gDigest : gDigests )
+ {
+ sb.append(gDigest);
+ sb.append(" ");
+ }
+ if (logger_.isTraceEnabled())
+ logger_.trace("Gossip Digests are : " + sb.toString());
+ }
+
+ public int getCurrentGenerationNumber(EndPoint endpoint)
+ {
+ return endPointStateMap_.get(endpoint).getHeartBeatState().getGeneration();
+ }
+
+ Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
+ {
+ GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream( bos );
+ GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, bos.toByteArray());
+ return message;
+ }
+
+ Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream(bos);
+ GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
+ if (logger_.isTraceEnabled())
+ logger_.trace("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, bos.toByteArray());
+ return message;
+ }
+
+ Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream(bos);
+ GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, bos.toByteArray());
+ return message;
+ }
+
+ boolean sendGossipToLiveNode(Message message)
+ {
+ int size = liveEndpoints_.size();
+ List<EndPoint> eps = new ArrayList<EndPoint>(liveEndpoints_);
+
+ if ( rrIndex_ >= size )
+ {
+ rrIndex_ = -1;
+ }
+
+ EndPoint to = eps.get(++rrIndex_);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
+ MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ return seeds_.contains(to);
+ }
+
+ /**
+ * Returns true if the chosen target was also a seed. False otherwise
+ *
+ * @param message message to sent
+ * @param epSet a set of endpoint from which a random endpoint is chosen.
+ * @return true if the chosen endpoint is also a seed.
+ */
+ boolean sendGossip(Message message, Set<EndPoint> epSet)
+ {
+ int size = epSet.size();
+ /* Generate a random number from 0 -> size */
+ List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
+ int index = (size == 1) ? 0 : random_.nextInt(size);
+ EndPoint to = liveEndPoints.get(index);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Sending a GossipDigestSynMessage to " + to + " ...");
+ MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ return seeds_.contains(to);
+ }
+
+ /* Sends a Gossip message to a live member and returns a reference to the member */
+ boolean doGossipToLiveMember(Message message)
+ {
+ int size = liveEndpoints_.size();
+ if ( size == 0 )
+ return false;
+ // return sendGossipToLiveNode(message);
+ /* Use this for a cluster size >= 30 */
+ return sendGossip(message, liveEndpoints_);
+ }
+
+ /* Sends a Gossip message to an unreachable member */
+ void doGossipToUnreachableMember(Message message)
+ {
+ double liveEndPoints = liveEndpoints_.size();
+ double unreachableEndPoints = unreachableEndpoints_.size();
+ if ( unreachableEndPoints > 0 )
+ {
+ /* based on some probability */
+ double prob = unreachableEndPoints / (liveEndPoints + 1);
+ double randDbl = random_.nextDouble();
+ if ( randDbl < prob )
+ sendGossip(message, unreachableEndpoints_);
+ }
+ }
+
+ /* Gossip to a seed for facilitating partition healing */
+ void doGossipToSeed(Message message)
+ {
+ int size = seeds_.size();
+ if ( size > 0 )
+ {
+ if ( size == 1 && seeds_.contains(localEndPoint_) )
+ {
+ return;
+ }
+
+ if ( liveEndpoints_.size() == 0 )
+ {
+ sendGossip(message, seeds_);
+ }
+ else
+ {
+ /* Gossip with the seed with some probability. */
+ double probability = seeds_.size() / ( liveEndpoints_.size() + unreachableEndpoints_.size() );
+ double randDbl = random_.nextDouble();
+ if ( randDbl <= probability )
+ sendGossip(message, seeds_);
+ }
+ }
+ }
+
+ void doStatusCheck()
+ {
+ Set<EndPoint> eps = endPointStateMap_.keySet();
+
+ for ( EndPoint endpoint : eps )
+ {
+ if ( endpoint.equals(localEndPoint_) )
+ continue;
+
+ FailureDetector.instance().interpret(endpoint);
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState != null )
+ {
+ long duration = System.currentTimeMillis() - epState.getUpdateTimestamp();
+ if ( !epState.isAlive() && (duration > aVeryLongTime_) )
+ {
+ evictFromMembership(endpoint);
+ }
+ }
+ }
+ }
+
+ EndPointState getEndPointStateForEndPoint(EndPoint ep)
+ {
+ return endPointStateMap_.get(ep);
+ }
+
+ synchronized EndPointState getStateForVersionBiggerThan(EndPoint forEndpoint, int version)
+ {
+ EndPointState epState = endPointStateMap_.get(forEndpoint);
+ EndPointState reqdEndPointState = null;
+
+ if ( epState != null )
+ {
+ /*
+ * Here we try to include the Heart Beat state only if it is
+ * greater than the version passed in. It might happen that
+ * the heart beat version maybe lesser than the version passed
+ * in and some application state has a version that is greater
+ * than the version passed in. In this case we also send the old
+ * heart beat and throw it away on the receiver if it is redundant.
+ */
+ int localHbVersion = epState.getHeartBeatState().getHeartBeatVersion();
+ if ( localHbVersion > version )
+ {
+ reqdEndPointState = new EndPointState(epState.getHeartBeatState());
+ }
+ Map<String, ApplicationState> appStateMap = epState.getApplicationState();
+ /* Accumulate all application states whose versions are greater than "version" variable */
+ Set<String> keys = appStateMap.keySet();
+ for ( String key : keys )
+ {
+ ApplicationState appState = appStateMap.get(key);
+ if ( appState.getStateVersion() > version )
+ {
+ if ( reqdEndPointState == null )
+ {
+ reqdEndPointState = new EndPointState(epState.getHeartBeatState());
+ }
+ reqdEndPointState.addApplicationState(key, appState);
+ }
+ }
+ }
+ return reqdEndPointState;
+ }
+
+ /*
+ * This method is called only from the JoinVerbHandler. This happens
+ * when a new node coming up multicasts the JoinMessage. Here we need
+ * to add the endPoint to the list of live endpoints.
+ */
+ synchronized void join(EndPoint from)
+ {
+ if ( !from.equals( localEndPoint_ ) )
+ {
+ /* Mark this endpoint as "live" */
+ liveEndpoints_.add(from);
+ unreachableEndpoints_.remove(from);
+ }
+ }
+
+ void notifyFailureDetector(List<GossipDigest> gDigests)
+ {
+ IFailureDetector fd = FailureDetector.instance();
+ for ( GossipDigest gDigest : gDigests )
+ {
+ EndPointState localEndPointState = endPointStateMap_.get(gDigest.endPoint_);
+ /*
+ * If the local endpoint state exists then report to the FD only
+ * if the versions workout.
+ */
+ if ( localEndPointState != null )
+ {
+ int localGeneration = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().generation_;
+ int remoteGeneration = gDigest.generation_;
+ if ( remoteGeneration > localGeneration )
+ {
+ fd.report(gDigest.endPoint_);
+ continue;
+ }
+
+ if ( remoteGeneration == localGeneration )
+ {
+ int localVersion = getMaxEndPointStateVersion(localEndPointState);
+ //int localVersion = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().getHeartBeatVersion();
+ int remoteVersion = gDigest.maxVersion_;
+ if ( remoteVersion > localVersion )
+ {
+ fd.report(gDigest.endPoint_);
+ }
+ }
+ }
+ }
+ }
+
+ void notifyFailureDetector(Map<EndPoint, EndPointState> remoteEpStateMap)
+ {
+ IFailureDetector fd = FailureDetector.instance();
+ Set<EndPoint> endpoints = remoteEpStateMap.keySet();
+ for ( EndPoint endpoint : endpoints )
+ {
+ EndPointState remoteEndPointState = remoteEpStateMap.get(endpoint);
+ EndPointState localEndPointState = endPointStateMap_.get(endpoint);
+ /*
+ * If the local endpoint state exists then report to the FD only
+ * if the versions workout.
+ */
+ if ( localEndPointState != null )
+ {
+ int localGeneration = localEndPointState.getHeartBeatState().generation_;
+ int remoteGeneration = remoteEndPointState.getHeartBeatState().generation_;
+ if ( remoteGeneration > localGeneration )
+ {
+ fd.report(endpoint);
+ continue;
+ }
+
+ if ( remoteGeneration == localGeneration )
+ {
+ int localVersion = getMaxEndPointStateVersion(localEndPointState);
+ //int localVersion = localEndPointState.getHeartBeatState().getHeartBeatVersion();
+ int remoteVersion = remoteEndPointState.getHeartBeatState().getHeartBeatVersion();
+ if ( remoteVersion > localVersion )
+ {
+ fd.report(endpoint);
+ }
+ }
+ }
+ }
+ }
+
+ void markAlive(EndPoint addr, EndPointState localState)
+ {
+ if (logger_.isTraceEnabled())
+ logger_.trace("marking as alive " + addr);
+ if ( !localState.isAlive() )
+ {
+ isAlive(addr, localState, true);
+ logger_.info("EndPoint " + addr + " is now UP");
+ }
+ }
+
+ private void handleNewJoin(EndPoint ep, EndPointState epState)
+ {
+ logger_.info("Node " + ep + " has now joined.");
+ /* Mark this endpoint as "live" */
+ endPointStateMap_.put(ep, epState);
+ isAlive(ep, epState, true);
+ /* Notify interested parties about endpoint state change */
+ doNotifications(ep, epState);
+ }
+
+ synchronized void applyStateLocally(Map<EndPoint, EndPointState> epStateMap)
+ {
+ Set<EndPoint> eps = epStateMap.keySet();
+ for( EndPoint ep : eps )
+ {
+ if ( ep.equals( localEndPoint_ ) )
+ continue;
+
+ EndPointState localEpStatePtr = endPointStateMap_.get(ep);
+ EndPointState remoteState = epStateMap.get(ep);
+ /*
+ If state does not exist just add it. If it does then add it only if the version
+ of the remote copy is greater than the local copy.
+ */
+ if ( localEpStatePtr != null )
+ {
+ int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration();
+ int remoteGeneration = remoteState.getHeartBeatState().getGeneration();
+
+ if (remoteGeneration > localGeneration)
+ {
+ handleNewJoin(ep, remoteState);
+ }
+ else if ( remoteGeneration == localGeneration )
+ {
+ /* manage the membership state */
+ int localMaxVersion = getMaxEndPointStateVersion(localEpStatePtr);
+ int remoteMaxVersion = getMaxEndPointStateVersion(remoteState);
+ if ( remoteMaxVersion > localMaxVersion )
+ {
+ markAlive(ep, localEpStatePtr);
+ applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState);
+ /* apply ApplicationState */
+ applyApplicationStateLocally(ep, localEpStatePtr, remoteState);
+ }
+ }
+ }
+ else
+ {
+ handleNewJoin(ep, remoteState);
+ }
+ }
+ }
+
+ void applyHeartBeatStateLocally(EndPoint addr, EndPointState localState, EndPointState remoteState)
+ {
+ HeartBeatState localHbState = localState.getHeartBeatState();
+ HeartBeatState remoteHbState = remoteState.getHeartBeatState();
+
+ if ( remoteHbState.getGeneration() > localHbState.getGeneration() )
+ {
+ markAlive(addr, localState);
+ localState.setHeartBeatState(remoteHbState);
+ }
+ if ( localHbState.getGeneration() == remoteHbState.getGeneration() )
+ {
+ if ( remoteHbState.getHeartBeatVersion() > localHbState.getHeartBeatVersion() )
+ {
+ int oldVersion = localHbState.getHeartBeatVersion();
+ localState.setHeartBeatState(remoteHbState);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
+ }
+ }
+ }
+
+ void applyApplicationStateLocally(EndPoint addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
+ {
+ Map<String, ApplicationState> localAppStateMap = localStatePtr.getApplicationState();
+ Map<String, ApplicationState> remoteAppStateMap = remoteStatePtr.getApplicationState();
+
+ Set<String> remoteKeys = remoteAppStateMap.keySet();
+ for ( String remoteKey : remoteKeys )
+ {
+ ApplicationState remoteAppState = remoteAppStateMap.get(remoteKey);
+ ApplicationState localAppState = localAppStateMap.get(remoteKey);
+
+ /* If state doesn't exist locally for this key then just apply it */
+ if ( localAppState == null )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ continue;
+ }
+
+ int remoteGeneration = remoteStatePtr.getHeartBeatState().getGeneration();
+ int localGeneration = localStatePtr.getHeartBeatState().getGeneration();
+
+ /* If the remoteGeneration is greater than localGeneration then apply state blindly */
+ if ( remoteGeneration > localGeneration )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ continue;
+ }
+
+ /* If the generations are the same then apply state if the remote version is greater than local version. */
+ if ( remoteGeneration == localGeneration )
+ {
+ int remoteVersion = remoteAppState.getStateVersion();
+ int localVersion = localAppState.getStateVersion();
+
+ if ( remoteVersion > localVersion )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ }
+ }
+ }
+ }
+
+ void doNotifications(EndPoint addr, EndPointState epState)
+ {
+ for ( IEndPointStateChangeSubscriber subscriber : subscribers_ )
+ {
+ subscriber.onChange(addr, epState);
+ }
+ }
+
+ synchronized void isAlive(EndPoint addr, EndPointState epState, boolean value)
+ {
+ epState.isAlive(value);
+ if ( value )
+ {
+ liveEndpoints_.add(addr);
+ unreachableEndpoints_.remove(addr);
+ }
+ else
+ {
+ liveEndpoints_.remove(addr);
+ unreachableEndpoints_.add(addr);
+ }
+ if ( epState.isAGossiper() )
+ return;
+ epState.isAGossiper(true);
+ }
+
+ /* These are helper methods used from GossipDigestSynVerbHandler */
+ Map<EndPoint, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
+ {
+ Map<EndPoint, GossipDigest> epMap = new HashMap<EndPoint, GossipDigest>();
+ for( GossipDigest gDigest : gDigestList )
+ {
+ epMap.put( gDigest.getEndPoint(), gDigest );
+ }
+ return epMap;
+ }
+
+ /* This is a helper method to get all EndPoints from a list of GossipDigests */
+ EndPoint[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
+ {
+ Set<EndPoint> set = new HashSet<EndPoint>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ set.add( gDigest.getEndPoint() );
+ }
+ return set.toArray( new EndPoint[0] );
+ }
+
+ /* Request all the state for the endpoint in the gDigest */
+ void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
+ {
+ /* We are here since we have no data for this endpoint locally so request everthing. */
+ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) );
+ }
+
+ /* Send all the data with version greater than maxRemoteVersion */
+ void sendAll(GossipDigest gDigest, Map<EndPoint, EndPointState> deltaEpStateMap, int maxRemoteVersion)
+ {
+ EndPointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndPoint(), maxRemoteVersion) ;
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr);
+ }
+
+ /*
+ This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
+ and the delta state are built up.
+ */
+ synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<EndPoint, EndPointState> deltaEpStateMap)
+ {
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ int remoteGeneration = gDigest.getGeneration();
+ int maxRemoteVersion = gDigest.getMaxVersion();
+ /* Get state associated with the end point in digest */
+ EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint());
+ /*
+ Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
+ then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
+ request all the data for this endpoint.
+ */
+ if ( epStatePtr != null )
+ {
+ int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
+ /* get the max version of all keys in the state associated with this endpoint */
+ int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr);
+ if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
+ continue;
+
+ if ( remoteGeneration > localGeneration )
+ {
+ /* we request everything from the gossiper */
+ requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+ }
+ if ( remoteGeneration < localGeneration )
+ {
+ /* send all data with generation = localgeneration and version > 0 */
+ sendAll(gDigest, deltaEpStateMap, 0);
+ }
+ if ( remoteGeneration == localGeneration )
+ {
+ /*
+ If the max remote version is greater then we request the remote endpoint send us all the data
+ for this endpoint with version greater than the max version number we have locally for this
+ endpoint.
+ If the max remote version is lesser, then we send all the data we have locally for this endpoint
+ with version greater than the max remote version.
+ */
+ if ( maxRemoteVersion > maxLocalVersion )
+ {
+ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) );
+ }
+ if ( maxRemoteVersion < maxLocalVersion )
+ {
+ /* send all data with generation = localgeneration and version > maxRemoteVersion */
+ sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
+ }
+ }
+ }
+ else
+ {
+ /* We are here since we have no data for this endpoint locally so request everything. */
+ requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+ }
+ }
+ }
+
+ public void start(EndPoint localEndPoint, int generationNbr) throws IOException
+ {
+ localEndPoint_ = localEndPoint;
+ /* Get the seeds from the config and initialize them. */
+ Set<String> seedHosts = DatabaseDescriptor.getSeeds();
+ for( String seedHost : seedHosts )
+ {
+ EndPoint seed = new EndPoint(InetAddress.getByName(seedHost).getHostAddress(),
+ DatabaseDescriptor.getControlPort());
+ if ( seed.equals(localEndPoint) )
+ continue;
+ seeds_.add(seed);
+ }
+
+ /* initialize the heartbeat state for this localEndPoint */
+ EndPointState localState = endPointStateMap_.get(localEndPoint_);
+ if ( localState == null )
+ {
+ HeartBeatState hbState = new HeartBeatState(generationNbr, 0);
+ localState = new EndPointState(hbState);
+ localState.isAlive(true);
+ localState.isAGossiper(true);
+ endPointStateMap_.put(localEndPoint_, localState);
+ }
+
+ /* starts a timer thread */
+ gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
+ }
+
+ public synchronized void addApplicationState(String key, ApplicationState appState)
+ {
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ if ( epState != null )
+ {
+ epState.addApplicationState(key, appState);
+ }
+ }
+
+ public void stop()
+ {
+ gossipTimer_.cancel();
+ }
+}
+
+class JoinVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ if (logger_.isDebugEnabled())
+ logger_.debug("Received a JoinMessage from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ JoinMessage joinMessage = null;
+ try
+ {
+ joinMessage = JoinMessage.serializer().deserialize(dis);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+ {
+ Gossiper.instance().join(from);
+ }
+ }
+}
+
+class GossipDigestSynVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ if (logger_.isTraceEnabled())
+ logger_.trace("Received a GossipDigestSynMessage from " + from);
+
+ byte[] bytes = message.getMessageBody();
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+ /* If the message is from a different cluster throw it away. */
+ if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+ return;
+
+ List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(gDigestList);
+
+ doSort(gDigestList);
+
+ List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+ Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+ Gossiper.instance().examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+
+ GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
+ Message gDigestAckMessage = Gossiper.instance().makeGossipDigestAckMessage(gDigestAck);
+ if (logger_.isTraceEnabled())
+ logger_.trace("Sending a GossipDigestAckMessage to " + from);
+ MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAckMessage, from);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /*
+ * First construct a map whose key is the endpoint in the GossipDigest and the value is the
+ * GossipDigest itself. Then build a list of version differences i.e difference between the
[... 119 lines stripped ...]