You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by pm...@apache.org on 2009/03/02 07:12:49 UTC
svn commit: r749205 [12/16] - in
/incubator/cassandra/src/org/apache/cassandra: analytics/ cli/ concurrent/
config/ continuations/ cql/ cql/common/ cql/compiler/ cql/compiler/common/
cql/compiler/parse/ cql/compiler/sem/ cql/driver/ cql/execution/ dht/...
Added: incubator/cassandra/src/org/apache/cassandra/gms/Gossiper.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/Gossiper.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/Gossiper.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/Gossiper.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,1129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.*;
+import java.util.*;
+
+import org.apache.cassandra.concurrent.SingleThreadedStage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.service.IComponentShutdown;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.utils.*;
+import org.apache.cassandra.net.*;
+
+/**
+ * This module is responsible for Gossiping information for the local endpoint. This abstraction
+ * maintains the list of live and dead endpoints. Periodically i.e. every 1 second this module
+ * chooses a random node and initiates a round of Gossip with it. A round of Gossip involves 3
+ * rounds of messaging. For instance if node A wants to initiate a round of Gossip with node B
+ * it starts off by sending node B a GossipDigestSynMessage. Node B on receipt of this message
+ * sends node A a GossipDigestAckMessage. On receipt of this message node A sends node B a
+ * GossipDigestAck2Message which completes a round of Gossip. This module as and when it hears one
+ * of the three above mentioned messages updates the Failure Detector with the liveness information.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class Gossiper implements IFailureDetectionEventListener, IEndPointStateChangePublisher, IComponentShutdown
+{
+ private class GossipTimerTask extends TimerTask
+ {
+ public void run()
+ {
+ try
+ {
+ synchronized( Gossiper.instance() )
+ {
+ /* Update the local heartbeat counter. */
+ endPointStateMap_.get(localEndPoint_).getHeartBeatState().updateHeartBeat();
+ List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
+ Gossiper.instance().makeRandomGossipDigest(gDigests);
+
+ if ( gDigests.size() > 0 )
+ {
+ Message message = makeGossipDigestSynMessage(gDigests);
+ /* Gossip to some random live member */
+ boolean bVal = doGossipToLiveMember(message);
+
+ /* Gossip to some unreachable member with some probability to check if he is back up */
+ doGossipToUnreachableMember(message);
+
+ /* Gossip to the seed. */
+ if ( !bVal )
+ doGossipToSeed(message);
+
+ logger_.debug("Performing status check ...");
+ doStatusCheck();
+ }
+ }
+ }
+ catch ( Throwable th )
+ {
+ logger_.info( LogUtil.throwableToString(th) );
+ }
+ }
+ }
+
+ final static int MAX_GOSSIP_PACKET_SIZE = 1428;
+ /* GS - abbreviation for GOSSIPER_STAGE */
+ final static String GOSSIP_STAGE = "GS";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ final static String JOIN_VERB_HANDLER = "JVH";
+ /* GSV - abbreviation for GOSSIP-DIGEST-SYN-VERB */
+ final static String GOSSIP_DIGEST_SYN_VERB = "GSV";
+ /* GAV - abbreviation for GOSSIP-DIGEST-ACK-VERB */
+ final static String GOSSIP_DIGEST_ACK_VERB = "GAV";
+ /* GA2V - abbreviation for GOSSIP-DIGEST-ACK2-VERB */
+ final static String GOSSIP_DIGEST_ACK2_VERB = "GA2V";
+ private final static int intervalInMillis_ = 1000;
+ private static Logger logger_ = Logger.getLogger(Gossiper.class);
+ static Gossiper gossiper_;
+
+ public synchronized static Gossiper instance()
+ {
+ if ( gossiper_ == null )
+ {
+ gossiper_ = new Gossiper();
+ }
+ return gossiper_;
+ }
+
+ private Timer gossipTimer_ = new Timer(false);
+ private EndPoint localEndPoint_;
+ private long aVeryLongTime_;
+ private Random random_ = new Random();
+ /* index used previously */
+ private int prevIndex_ = 0;
+ /* round robin index through live endpoint set */
+ private int rrIndex_ = 0;
+
+ /* subscribers for interest in EndPointState change */
+ private List<IEndPointStateChangeSubscriber> subscribers_ = new ArrayList<IEndPointStateChangeSubscriber>();
+
+ /* live member set */
+ private Set<EndPoint> liveEndpoints_ = new HashSet<EndPoint>();
+
+ /* unreachable member set */
+ private Set<EndPoint> unreachableEndpoints_ = new HashSet<EndPoint>();
+
+ /* initial seeds for joining the cluster */
+ private Set<EndPoint> seeds_ = new HashSet<EndPoint>();
+
+ /* map where key is the endpoint and value is the state associated with the endpoint */
+ Map<EndPoint, EndPointState> endPointStateMap_ = new Hashtable<EndPoint, EndPointState>();
+
+ /* private CTOR */
+ Gossiper()
+ {
+ aVeryLongTime_ = 259200 * 1000;
+ /* register with the Failure Detector for receiving Failure detector events */
+ FailureDetector.instance().registerFailureDetectionEventListener(this);
+ /* register the verbs */
+ MessagingService.getMessagingInstance().registerVerbHandlers(JOIN_VERB_HANDLER, new JoinVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_SYN_VERB, new GossipDigestSynVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK_VERB, new GossipDigestAckVerbHandler());
+ MessagingService.getMessagingInstance().registerVerbHandlers(GOSSIP_DIGEST_ACK2_VERB, new GossipDigestAck2VerbHandler());
+ /* register the Gossip stage */
+ StageManager.registerStage( Gossiper.GOSSIP_STAGE, new SingleThreadedStage("GMFD") );
+ /* register with Storage Service for shutdown */
+ StorageService.instance().registerComponentForShutdown(this);
+ }
+
+ public void register(IEndPointStateChangeSubscriber subscriber)
+ {
+ subscribers_.add(subscriber);
+ }
+
+ public void unregister(IEndPointStateChangeSubscriber subscriber)
+ {
+ subscribers_.remove(subscriber);
+ }
+
+ public Set<EndPoint> getAllMembers()
+ {
+ Set<EndPoint> allMbrs = new HashSet<EndPoint>();
+ allMbrs.addAll(getLiveMembers());
+ allMbrs.addAll(getUnreachableMembers());
+ return allMbrs;
+ }
+
+ public Set<EndPoint> getLiveMembers()
+ {
+ Set<EndPoint> liveMbrs = new HashSet<EndPoint>(liveEndpoints_);
+ liveMbrs.add( new EndPoint( localEndPoint_.getHost(), localEndPoint_.getPort() ) );
+ return liveMbrs;
+ }
+
+ public Set<EndPoint> getUnreachableMembers()
+ {
+ return new HashSet<EndPoint>(unreachableEndpoints_);
+ }
+
+ /**
+ * This method is used to forcibly remove a node from the membership
+ * set. He is forgotten locally immediately.
+ *
+ * param@ ep the endpoint to be removed from membership.
+ */
+ public synchronized void removeFromMembership(EndPoint ep)
+ {
+ endPointStateMap_.remove(ep);
+ liveEndpoints_.remove(ep);
+ unreachableEndpoints_ .remove(ep);
+ }
+
+ /**
+ * This method is part of IFailureDetectionEventListener interface. This is invoked
+ * by the Failure Detector when it convicts an end point.
+ *
+ * param @ endpoint end point that is convicted.
+ */
+
+ public void convict(EndPoint endpoint)
+ {
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState != null )
+ {
+ if ( !epState.isAlive() && epState.isAGossiper() )
+ {
+ /*
+ * just to be sure - is invoked just to make sure that
+ * it was called atleast once.
+ */
+ if ( liveEndpoints_.contains(endpoint) )
+ {
+ logger_.info("EndPoint " + endpoint + " is now dead.");
+ isAlive(endpoint, epState, false);
+
+ /* Notify an endpoint is dead to interested parties. */
+ EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
+ doNotifications(endpoint, deltaState);
+ }
+ epState.isAGossiper(false);
+ }
+ }
+ }
+
+ /**
+ * This method is part of IFailureDetectionEventListener interface. This is invoked
+ * by the Failure Detector when it suspects an end point.
+ *
+ * param @ endpoint end point that is suspected.
+ */
+ public void suspect(EndPoint endpoint)
+ {
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState.isAlive() )
+ {
+ logger_.info("EndPoint " + endpoint + " is now dead.");
+ isAlive(endpoint, epState, false);
+
+ /* Notify an endpoint is dead to interested parties. */
+ EndPointState deltaState = new EndPointState(epState.getHeartBeatState());
+ doNotifications(endpoint, deltaState);
+ }
+ }
+
+ int getMaxEndPointStateVersion(EndPointState epState)
+ {
+ List<Integer> versions = new ArrayList<Integer>();
+ versions.add( epState.getHeartBeatState().getHeartBeatVersion() );
+ Map<String, ApplicationState> appStateMap = epState.getApplicationState();
+
+ Set<String> keys = appStateMap.keySet();
+ for ( String key : keys )
+ {
+ int stateVersion = appStateMap.get(key).getStateVersion();
+ versions.add( stateVersion );
+ }
+
+ /* sort to get the max version to build GossipDigest for this endpoint */
+ Collections.sort(versions);
+ int maxVersion = versions.get(versions.size() - 1);
+ versions.clear();
+ return maxVersion;
+ }
+
+ /**
+ * Removes the endpoint from unreachable endpoint set
+ *
+ * @param endpoint endpoint to be removed from the current membership.
+ */
+ void evictFromMembership(EndPoint endpoint)
+ {
+ unreachableEndpoints_.remove(endpoint);
+ }
+
+ /* No locking required since it is called from a method that already has acquired a lock */
+ @Deprecated
+ void makeGossipDigest(List<GossipDigest> gDigests)
+ {
+ /* Add the local endpoint state */
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ int generation = epState.getHeartBeatState().getGeneration();
+ int maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
+
+ for ( EndPoint liveEndPoint : liveEndpoints_ )
+ {
+ epState = endPointStateMap_.get(liveEndPoint);
+ if ( epState != null )
+ {
+ generation = epState.getHeartBeatState().getGeneration();
+ maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
+ }
+ else
+ {
+ gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
+ }
+ }
+ }
+
+ /**
+ * No locking required since it is called from a method that already
+ * has acquired a lock. The gossip digest is built based on randomization
+ * rather than just looping through the collection of live endpoints.
+ *
+ * @param gDigests list of Gossip Digests.
+ */
+ void makeRandomGossipDigest(List<GossipDigest> gDigests)
+ {
+ /* Add the local endpoint state */
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ int generation = epState.getHeartBeatState().getGeneration();
+ int maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(localEndPoint_, generation, maxVersion) );
+
+ int size = liveEndpoints_.size();
+ List<EndPoint> endpoints = new ArrayList<EndPoint>( liveEndpoints_ );
+ Collections.shuffle(endpoints, random_);
+ for ( EndPoint liveEndPoint : endpoints )
+ {
+ epState = endPointStateMap_.get(liveEndPoint);
+ if ( epState != null )
+ {
+ generation = epState.getHeartBeatState().getGeneration();
+ maxVersion = getMaxEndPointStateVersion(epState);
+ gDigests.add( new GossipDigest(liveEndPoint, generation, maxVersion) );
+ }
+ else
+ {
+ gDigests.add( new GossipDigest(liveEndPoint, 0, 0) );
+ }
+ }
+
+ /* FOR DEBUG ONLY - remove later */
+ StringBuilder sb = new StringBuilder();
+ for ( GossipDigest gDigest : gDigests )
+ {
+ sb.append(gDigest);
+ sb.append(" ");
+ }
+ logger_.debug("Gossip Digests are : " + sb.toString());
+ }
+
+ public int getCurrentGenerationNumber(EndPoint endpoint)
+ {
+ return endPointStateMap_.get(endpoint).getHeartBeatState().getGeneration();
+ }
+
+ Message makeGossipDigestSynMessage(List<GossipDigest> gDigests) throws IOException
+ {
+ GossipDigestSynMessage gDigestMessage = new GossipDigestSynMessage(DatabaseDescriptor.getClusterName(), gDigests);
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream( bos );
+ GossipDigestSynMessage.serializer().serialize(gDigestMessage, dos);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_SYN_VERB, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ Message makeGossipDigestAckMessage(GossipDigestAckMessage gDigestAckMessage) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream(bos);
+ GossipDigestAckMessage.serializer().serialize(gDigestAckMessage, dos);
+ logger_.debug("@@@@ Size of GossipDigestAckMessage is " + bos.toByteArray().length);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK_VERB, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ Message makeGossipDigestAck2Message(GossipDigestAck2Message gDigestAck2Message) throws IOException
+ {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream(Gossiper.MAX_GOSSIP_PACKET_SIZE);
+ DataOutputStream dos = new DataOutputStream(bos);
+ GossipDigestAck2Message.serializer().serialize(gDigestAck2Message, dos);
+ Message message = new Message(localEndPoint_, Gossiper.GOSSIP_STAGE, GOSSIP_DIGEST_ACK2_VERB, new Object[]{bos.toByteArray()});
+ return message;
+ }
+
+ boolean sendGossipToLiveNode(Message message)
+ {
+ int size = liveEndpoints_.size();
+ List<EndPoint> eps = new ArrayList<EndPoint>(liveEndpoints_);
+
+ if ( rrIndex_ >= size )
+ {
+ rrIndex_ = -1;
+ }
+
+ EndPoint to = eps.get(++rrIndex_);
+ logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
+ MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ return seeds_.contains(to);
+ }
+
+ /**
+ * Returns true if the chosen target was also a seed. False otherwise
+ *
+ * @param message message to sent
+ * @param epSet a set of endpoint from which a random endpoint is chosen.
+ * @return true if the chosen endpoint is also a seed.
+ */
+ boolean sendGossip(Message message, Set<EndPoint> epSet)
+ {
+ int size = epSet.size();
+ /* Generate a random number from 0 -> size */
+ List<EndPoint> liveEndPoints = new ArrayList<EndPoint>(epSet);
+ int index = (size == 1) ? 0 : random_.nextInt(size);
+ EndPoint to = liveEndPoints.get(index);
+ logger_.info("Sending a GossipDigestSynMessage to " + to + " ...");
+ MessagingService.getMessagingInstance().sendUdpOneWay(message, to);
+ return seeds_.contains(to);
+ }
+
+ /* Sends a Gossip message to a live member and returns a reference to the member */
+ boolean doGossipToLiveMember(Message message)
+ {
+ int size = liveEndpoints_.size();
+ if ( size == 0 )
+ return false;
+ // return sendGossipToLiveNode(message);
+ /* Use this for a cluster size >= 30 */
+ return sendGossip(message, liveEndpoints_);
+ }
+
+ /* Sends a Gossip message to an unreachable member */
+ void doGossipToUnreachableMember(Message message)
+ {
+ double liveEndPoints = liveEndpoints_.size();
+ double unreachableEndPoints = unreachableEndpoints_.size();
+ if ( unreachableEndPoints > 0 )
+ {
+ /* based on some probability */
+ double prob = unreachableEndPoints / (liveEndPoints + 1);
+ double randDbl = random_.nextDouble();
+ if ( randDbl < prob )
+ sendGossip(message, unreachableEndpoints_);
+ }
+ }
+
+ /* Gossip to a seed for facilitating partition healing */
+ void doGossipToSeed(Message message)
+ {
+ int size = seeds_.size();
+ if ( size > 0 )
+ {
+ if ( size == 1 && seeds_.contains(localEndPoint_) )
+ {
+ return;
+ }
+
+ if ( liveEndpoints_.size() == 0 )
+ {
+ sendGossip(message, seeds_);
+ }
+ else
+ {
+ /* Gossip with the seed with some probability. */
+ double probability = seeds_.size() / ( liveEndpoints_.size() + unreachableEndpoints_.size() );
+ double randDbl = random_.nextDouble();
+ if ( randDbl <= probability )
+ sendGossip(message, seeds_);
+ }
+ }
+ }
+
+ void doStatusCheck()
+ {
+ long now = System.currentTimeMillis();
+ Set<EndPoint> eps = endPointStateMap_.keySet();
+
+ for ( EndPoint endpoint : eps )
+ {
+ if ( endpoint.equals(localEndPoint_) )
+ continue;
+
+ FailureDetector.instance().intepret(endpoint);
+ EndPointState epState = endPointStateMap_.get(endpoint);
+ if ( epState != null )
+ {
+ long l = now - epState.getUpdateTimestamp();
+ long duration = now - l;
+ if ( !epState.isAlive() && (duration > aVeryLongTime_) )
+ {
+ evictFromMembership(endpoint);
+ }
+ }
+ }
+ }
+
+ EndPointState getEndPointStateForEndPoint(EndPoint ep)
+ {
+ return endPointStateMap_.get(ep);
+ }
+
+ synchronized EndPointState getStateForVersionBiggerThan(EndPoint forEndpoint, int version)
+ {
+ EndPointState epState = endPointStateMap_.get(forEndpoint);
+ EndPointState reqdEndPointState = null;
+
+ if ( epState != null )
+ {
+ /*
+ * Here we try to include the Heart Beat state only if it is
+ * greater than the version passed in. It might happen that
+ * the heart beat version maybe lesser than the version passed
+ * in and some application state has a version that is greater
+ * than the version passed in. In this case we also send the old
+ * heart beat and throw it away on the receiver if it is redundant.
+ */
+ int localHbVersion = epState.getHeartBeatState().getHeartBeatVersion();
+ if ( localHbVersion > version )
+ {
+ reqdEndPointState = new EndPointState(epState.getHeartBeatState());
+ }
+ Map<String, ApplicationState> appStateMap = epState.getApplicationState();
+ /* Accumulate all application states whose versions are greater than "version" variable */
+ Set<String> keys = appStateMap.keySet();
+ for ( String key : keys )
+ {
+ ApplicationState appState = appStateMap.get(key);
+ if ( appState.getStateVersion() > version )
+ {
+ if ( reqdEndPointState == null )
+ {
+ reqdEndPointState = new EndPointState(epState.getHeartBeatState());
+ }
+ reqdEndPointState.addApplicationState(key, appState);
+ }
+ }
+ }
+ return reqdEndPointState;
+ }
+
+ /*
+ * This method is called only from the JoinVerbHandler. This happens
+ * when a new node coming up multicasts the JoinMessage. Here we need
+ * to add the endPoint to the list of live endpoints.
+ */
+ synchronized void join(EndPoint from)
+ {
+ if ( !from.equals( localEndPoint_ ) )
+ {
+ /* Mark this endpoint as "live" */
+ liveEndpoints_.add(from);
+ unreachableEndpoints_.remove(from);
+ }
+ }
+
+ void notifyFailureDetector(List<GossipDigest> gDigests)
+ {
+ IFailureDetector fd = FailureDetector.instance();
+ for ( GossipDigest gDigest : gDigests )
+ {
+ EndPointState localEndPointState = endPointStateMap_.get(gDigest.endPoint_);
+ /*
+ * If the local endpoint state exists then report to the FD only
+ * if the versions workout.
+ */
+ if ( localEndPointState != null )
+ {
+ int localGeneration = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().generation_;
+ int remoteGeneration = gDigest.generation_;
+ if ( remoteGeneration > localGeneration )
+ {
+ logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
+ fd.report(gDigest.endPoint_);
+ continue;
+ }
+
+ if ( remoteGeneration == localGeneration )
+ {
+ int localVersion = getMaxEndPointStateVersion(localEndPointState);
+ //int localVersion = endPointStateMap_.get(gDigest.endPoint_).getHeartBeatState().getHeartBeatVersion();
+ int remoteVersion = gDigest.maxVersion_;
+ if ( remoteVersion > localVersion )
+ {
+ logger_.debug("Reporting " + gDigest.endPoint_ + " to the FD.");
+ fd.report(gDigest.endPoint_);
+ }
+ }
+ }
+ }
+ }
+
+ void notifyFailureDetector(Map<EndPoint, EndPointState> remoteEpStateMap)
+ {
+ IFailureDetector fd = FailureDetector.instance();
+ Set<EndPoint> endpoints = remoteEpStateMap.keySet();
+ for ( EndPoint endpoint : endpoints )
+ {
+ EndPointState remoteEndPointState = remoteEpStateMap.get(endpoint);
+ EndPointState localEndPointState = endPointStateMap_.get(endpoint);
+ /*
+ * If the local endpoint state exists then report to the FD only
+ * if the versions workout.
+ */
+ if ( localEndPointState != null )
+ {
+ int localGeneration = localEndPointState.getHeartBeatState().generation_;
+ int remoteGeneration = remoteEndPointState.getHeartBeatState().generation_;
+ if ( remoteGeneration > localGeneration )
+ {
+ logger_.debug("Reporting " + endpoint + " to the FD.");
+ fd.report(endpoint);
+ continue;
+ }
+
+ if ( remoteGeneration == localGeneration )
+ {
+ int localVersion = getMaxEndPointStateVersion(localEndPointState);
+ //int localVersion = localEndPointState.getHeartBeatState().getHeartBeatVersion();
+ int remoteVersion = remoteEndPointState.getHeartBeatState().getHeartBeatVersion();
+ if ( remoteVersion > localVersion )
+ {
+ logger_.debug("Reporting " + endpoint + " to the FD.");
+ fd.report(endpoint);
+ }
+ }
+ }
+ }
+ }
+
+ void resusitate(EndPoint addr, EndPointState localState)
+ {
+ logger_.debug("Attempting to resusitate " + addr);
+ if ( !localState.isAlive() )
+ {
+ isAlive(addr, localState, true);
+ logger_.info("EndPoint " + addr + " is now UP");
+ }
+ }
+
+ private void handleNewJoin(EndPoint ep, EndPointState epState)
+ {
+ logger_.info("Node " + ep + " has now joined.");
+ /* Mark this endpoint as "live" */
+ endPointStateMap_.put(ep, epState);
+ isAlive(ep, epState, true);
+ /* Notify interested parties about endpoint state change */
+ doNotifications(ep, epState);
+ }
+
+ synchronized void applyStateLocally(Map<EndPoint, EndPointState> epStateMap)
+ {
+ Set<EndPoint> eps = epStateMap.keySet();
+ for( EndPoint ep : eps )
+ {
+ if ( ep.equals( localEndPoint_ ) )
+ continue;
+
+ EndPointState localEpStatePtr = endPointStateMap_.get(ep);
+ EndPointState remoteState = epStateMap.get(ep);
+ /*
+ If state does not exist just add it. If it does then add it only if the version
+ of the remote copy is greater than the local copy.
+ */
+ if ( localEpStatePtr != null )
+ {
+ int localGeneration = localEpStatePtr.getHeartBeatState().getGeneration();
+ int remoteGeneration = remoteState.getHeartBeatState().getGeneration();
+
+ if (remoteGeneration > localGeneration)
+ {
+ handleNewJoin(ep, remoteState);
+ }
+ else if ( remoteGeneration == localGeneration )
+ {
+ /* manage the membership state */
+ int localMaxVersion = getMaxEndPointStateVersion(localEpStatePtr);
+ int remoteMaxVersion = getMaxEndPointStateVersion(remoteState);
+ if ( remoteMaxVersion > localMaxVersion )
+ {
+ resusitate(ep, localEpStatePtr);
+ applyHeartBeatStateLocally(ep, localEpStatePtr, remoteState);
+ /* apply ApplicationState */
+ applyApplicationStateLocally(ep, localEpStatePtr, remoteState);
+ }
+ }
+ }
+ else
+ {
+ handleNewJoin(ep, remoteState);
+ }
+ }
+ }
+
+ void applyHeartBeatStateLocally(EndPoint addr, EndPointState localState, EndPointState remoteState)
+ {
+ HeartBeatState localHbState = localState.getHeartBeatState();
+ HeartBeatState remoteHbState = remoteState.getHeartBeatState();
+
+ if ( remoteHbState.getGeneration() > localHbState.getGeneration() )
+ {
+ resusitate(addr, localState);
+ localState.setHeartBeatState(remoteHbState);
+ }
+ if ( localHbState.getGeneration() == remoteHbState.getGeneration() )
+ {
+ if ( remoteHbState.getHeartBeatVersion() > localHbState.getHeartBeatVersion() )
+ {
+ int oldVersion = localHbState.getHeartBeatVersion();
+ localState.setHeartBeatState(remoteHbState);
+ logger_.debug("Updating heartbeat state version to " + localState.getHeartBeatState().getHeartBeatVersion() + " from " + oldVersion + " for " + addr + " ...");
+ }
+ }
+ }
+
+ void applyApplicationStateLocally(EndPoint addr, EndPointState localStatePtr, EndPointState remoteStatePtr)
+ {
+ Map<String, ApplicationState> localAppStateMap = localStatePtr.getApplicationState();
+ Map<String, ApplicationState> remoteAppStateMap = remoteStatePtr.getApplicationState();
+
+ Set<String> remoteKeys = remoteAppStateMap.keySet();
+ for ( String remoteKey : remoteKeys )
+ {
+ ApplicationState remoteAppState = remoteAppStateMap.get(remoteKey);
+ ApplicationState localAppState = localAppStateMap.get(remoteKey);
+
+ /* If state doesn't exist locally for this key then just apply it */
+ if ( localAppState == null )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ continue;
+ }
+
+ int remoteGeneration = remoteStatePtr.getHeartBeatState().getGeneration();
+ int localGeneration = localStatePtr.getHeartBeatState().getGeneration();
+
+ /* If the remoteGeneration is greater than localGeneration then apply state blindly */
+ if ( remoteGeneration > localGeneration )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ continue;
+ }
+
+ /* If the generations are the same then apply state if the remote version is greater than local version. */
+ if ( remoteGeneration == localGeneration )
+ {
+ int remoteVersion = remoteAppState.getStateVersion();
+ int localVersion = localAppState.getStateVersion();
+
+ if ( remoteVersion > localVersion )
+ {
+ localStatePtr.addApplicationState(remoteKey, remoteAppState);
+ /* notify interested parties of endpoint state change */
+ EndPointState deltaState = new EndPointState(localStatePtr.getHeartBeatState());
+ deltaState.addApplicationState(remoteKey, remoteAppState);
+ doNotifications(addr, deltaState);
+ }
+ }
+ }
+ }
+
+ void doNotifications(EndPoint addr, EndPointState epState)
+ {
+ for ( IEndPointStateChangeSubscriber subscriber : subscribers_ )
+ {
+ subscriber.onChange(addr, epState);
+ }
+ }
+
+ synchronized void isAlive(EndPoint addr, EndPointState epState, boolean value)
+ {
+ epState.isAlive(value);
+ if ( value )
+ {
+ liveEndpoints_.add(addr);
+ unreachableEndpoints_.remove(addr);
+ }
+ else
+ {
+ liveEndpoints_.remove(addr);
+ unreachableEndpoints_.add(addr);
+ }
+ if ( epState.isAGossiper() )
+ return;
+ epState.isAGossiper(true);
+ }
+
+ /* These are helper methods used from GossipDigestSynVerbHandler */
+ Map<EndPoint, GossipDigest> getEndPointGossipDigestMap(List<GossipDigest> gDigestList)
+ {
+ Map<EndPoint, GossipDigest> epMap = new HashMap<EndPoint, GossipDigest>();
+ for( GossipDigest gDigest : gDigestList )
+ {
+ epMap.put( gDigest.getEndPoint(), gDigest );
+ }
+ return epMap;
+ }
+
+ /* This is a helper method to get all EndPoints from a list of GossipDigests */
+ EndPoint[] getEndPointsFromGossipDigest(List<GossipDigest> gDigestList)
+ {
+ Set<EndPoint> set = new HashSet<EndPoint>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ set.add( gDigest.getEndPoint() );
+ }
+ return set.toArray( new EndPoint[0] );
+ }
+
+ /* Request all the state for the endpoint in the gDigest */
+ void requestAll(GossipDigest gDigest, List<GossipDigest> deltaGossipDigestList, int remoteGeneration)
+ {
+ /* We are here since we have no data for this endpoint locally so request everthing. */
+ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, 0) );
+ }
+
+ /* Send all the data with version greater than maxRemoteVersion */
+ void sendAll(GossipDigest gDigest, Map<EndPoint, EndPointState> deltaEpStateMap, int maxRemoteVersion)
+ {
+ EndPointState localEpStatePtr = getStateForVersionBiggerThan(gDigest.getEndPoint(), maxRemoteVersion) ;
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(gDigest.getEndPoint(), localEpStatePtr);
+ }
+
+ /*
+ This method is used to figure the state that the Gossiper has but Gossipee doesn't. The delta digests
+ and the delta state are built up.
+ */
+ synchronized void examineGossiper(List<GossipDigest> gDigestList, List<GossipDigest> deltaGossipDigestList, Map<EndPoint, EndPointState> deltaEpStateMap)
+ {
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ int remoteGeneration = gDigest.getGeneration();
+ int maxRemoteVersion = gDigest.getMaxVersion();
+ /* Get state associated with the end point in digest */
+ EndPointState epStatePtr = endPointStateMap_.get(gDigest.getEndPoint());
+ /*
+ Here we need to fire a GossipDigestAckMessage. If we have some data associated with this endpoint locally
+ then we follow the "if" path of the logic. If we have absolutely nothing for this endpoint we need to
+ request all the data for this endpoint.
+ */
+ if ( epStatePtr != null )
+ {
+ int localGeneration = epStatePtr.getHeartBeatState().getGeneration();
+ /* get the max version of all keys in the state associated with this endpoint */
+ int maxLocalVersion = getMaxEndPointStateVersion(epStatePtr);
+ if ( remoteGeneration == localGeneration && maxRemoteVersion == maxLocalVersion )
+ continue;
+
+ if ( remoteGeneration > localGeneration )
+ {
+ /* we request everything from the gossiper */
+ requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+ }
+ if ( remoteGeneration < localGeneration )
+ {
+ /* send all data with generation = localgeneration and version > 0 */
+ sendAll(gDigest, deltaEpStateMap, 0);
+ }
+ if ( remoteGeneration == localGeneration )
+ {
+ /*
+ If the max remote version is greater then we request the remote endpoint send us all the data
+ for this endpoint with version greater than the max version number we have locally for this
+ endpoint.
+ If the max remote version is lesser, then we send all the data we have locally for this endpoint
+ with version greater than the max remote version.
+ */
+ if ( maxRemoteVersion > maxLocalVersion )
+ {
+ deltaGossipDigestList.add( new GossipDigest(gDigest.getEndPoint(), remoteGeneration, maxLocalVersion) );
+ }
+ if ( maxRemoteVersion < maxLocalVersion )
+ {
+ /* send all data with generation = localgeneration and version > maxRemoteVersion */
+ sendAll(gDigest, deltaEpStateMap, maxRemoteVersion);
+ }
+ }
+ }
+ else
+ {
+ /* We are here since we have no data for this endpoint locally so request everthing. */
+ requestAll(gDigest, deltaGossipDigestList, remoteGeneration);
+ }
+ }
+ }
+
+ public void start(EndPoint localEndPoint, int generationNbr) throws IOException
+ {
+ localEndPoint_ = localEndPoint;
+ /* Get the seeds from the config and initialize them. */
+ Set<String> seedHosts = DatabaseDescriptor.getSeeds();
+ for( String seedHost : seedHosts )
+ {
+ EndPoint seed = new EndPoint(seedHost, DatabaseDescriptor.getControlPort());
+ if ( seed.equals(localEndPoint) )
+ continue;
+ seeds_.add(seed);
+ }
+
+ /* initialize the heartbeat state for this localEndPoint */
+ EndPointState localState = endPointStateMap_.get(localEndPoint_);
+ if ( localState == null )
+ {
+ HeartBeatState hbState = new HeartBeatState(generationNbr, 0);
+ localState = new EndPointState(hbState);
+ localState.isAlive(true);
+ localState.isAGossiper(true);
+ endPointStateMap_.put(localEndPoint_, localState);
+ }
+
+ /* starts a timer thread */
+ gossipTimer_.schedule( new GossipTimerTask(), Gossiper.intervalInMillis_, Gossiper.intervalInMillis_);
+ }
+
+ public void shutdown()
+ {
+ /* This prevents this guy from responding to Gossip messages */
+ MessagingService.getMessagingInstance().deregisterVerbHandlers(GOSSIP_DIGEST_SYN_VERB);
+ MessagingService.getMessagingInstance().deregisterVerbHandlers(GOSSIP_DIGEST_ACK_VERB);
+ MessagingService.getMessagingInstance().deregisterVerbHandlers(GOSSIP_DIGEST_ACK2_VERB);
+ /* This prevents this guy from Gossiping */
+ gossipTimer_.cancel();
+ }
+
+ public synchronized void addApplicationState(String key, ApplicationState appState)
+ {
+ EndPointState epState = endPointStateMap_.get(localEndPoint_);
+ if ( epState != null )
+ {
+ epState.addApplicationState(key, appState);
+ }
+ }
+
+ public void stop()
+ {
+ gossipTimer_.cancel();
+ }
+}
+
+class JoinVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( JoinVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.debug("Received a JoinMessage from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ JoinMessage joinMessage = JoinMessage.serializer().deserialize(dis);
+ if ( joinMessage.clusterId_.equals( DatabaseDescriptor.getClusterName() ) )
+ {
+ Gossiper.instance().join(from);
+ }
+ }
+ catch ( IOException ex )
+ {
+ logger_.info( LogUtil.throwableToString(ex) );
+ }
+ }
+}
+
+class GossipDigestSynVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger( GossipDigestSynVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.info("Received a GossipDigestSynMessage from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ GossipDigestSynMessage gDigestMessage = GossipDigestSynMessage.serializer().deserialize(dis);
+ /* If the message is from a different cluster throw it away. */
+ if ( !gDigestMessage.clusterId_.equals(DatabaseDescriptor.getClusterName()) )
+ return;
+
+ List<GossipDigest> gDigestList = gDigestMessage.getGossipDigests();
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(gDigestList);
+
+ doSort(gDigestList);
+
+ List<GossipDigest> deltaGossipDigestList = new ArrayList<GossipDigest>();
+ Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+ Gossiper.instance().examineGossiper(gDigestList, deltaGossipDigestList, deltaEpStateMap);
+
+ GossipDigestAckMessage gDigestAck = new GossipDigestAckMessage(deltaGossipDigestList, deltaEpStateMap);
+ Message gDigestAckMessage = Gossiper.instance().makeGossipDigestAckMessage(gDigestAck);
+ logger_.info("Sending a GossipDigestAckMessage to " + from);
+ MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAckMessage, from);
+ }
+ catch (IOException e)
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ }
+
+ /*
+ * First construct a map whose key is the endpoint in the GossipDigest and the value is the
+ * GossipDigest itself. Then build a list of version differences i.e difference between the
+ * version in the GossipDigest and the version in the local state for a given EndPoint.
+ * Sort this list. Now loop through the sorted list and retrieve the GossipDigest corresponding
+ * to the endpoint from the map that was initially constructed.
+ */
+ private void doSort(List<GossipDigest> gDigestList)
+ {
+ /* Construct a map of endpoint to GossipDigest. */
+ Map<EndPoint, GossipDigest> epToDigestMap = new HashMap<EndPoint, GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ epToDigestMap.put(gDigest.getEndPoint(), gDigest);
+ }
+
+ /*
+ * These digests have their maxVersion set to the difference of the version
+ * of the local EndPointState and the version found in the GossipDigest.
+ */
+ List<GossipDigest> diffDigests = new ArrayList<GossipDigest>();
+ for ( GossipDigest gDigest : gDigestList )
+ {
+ EndPoint ep = gDigest.getEndPoint();
+ EndPointState epState = Gossiper.instance().getEndPointStateForEndPoint(ep);
+ int version = (epState != null) ? Gossiper.instance().getMaxEndPointStateVersion( epState ) : 0;
+ int diffVersion = Math.abs(version - gDigest.getMaxVersion() );
+ diffDigests.add( new GossipDigest(ep, gDigest.getGeneration(), diffVersion) );
+ }
+
+ gDigestList.clear();
+ Collections.sort(diffDigests);
+ int size = diffDigests.size();
+ /*
+ * Report the digests in descending order. This takes care of the endpoints
+ * that are far behind w.r.t this local endpoint
+ */
+ for ( int i = size - 1; i >= 0; --i )
+ {
+ gDigestList.add( epToDigestMap.get(diffDigests.get(i).getEndPoint()) );
+ }
+ }
+}
+
+class GossipDigestAckVerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestAckVerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.info("Received a GossipDigestAckMessage from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+
+ try
+ {
+ GossipDigestAckMessage gDigestAckMessage = GossipDigestAckMessage.serializer().deserialize(dis);
+ List<GossipDigest> gDigestList = gDigestAckMessage.getGossipDigestList();
+ Map<EndPoint, EndPointState> epStateMap = gDigestAckMessage.getEndPointStateMap();
+
+ if ( epStateMap.size() > 0 )
+ {
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(epStateMap);
+ Gossiper.instance().applyStateLocally(epStateMap);
+ }
+
+ /* Get the state required to send to this gossipee - construct GossipDigestAck2Message */
+ Map<EndPoint, EndPointState> deltaEpStateMap = new HashMap<EndPoint, EndPointState>();
+ for( GossipDigest gDigest : gDigestList )
+ {
+ EndPoint addr = gDigest.getEndPoint();
+ EndPointState localEpStatePtr = Gossiper.instance().getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
+ if ( localEpStatePtr != null )
+ deltaEpStateMap.put(addr, localEpStatePtr);
+ }
+
+ GossipDigestAck2Message gDigestAck2 = new GossipDigestAck2Message(deltaEpStateMap);
+ Message gDigestAck2Message = Gossiper.instance().makeGossipDigestAck2Message(gDigestAck2);
+ logger_.info("Sending a GossipDigestAck2Message to " + from);
+ MessagingService.getMessagingInstance().sendUdpOneWay(gDigestAck2Message, from);
+ }
+ catch ( IOException e )
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ }
+}
+
+class GossipDigestAck2VerbHandler implements IVerbHandler
+{
+ private static Logger logger_ = Logger.getLogger(GossipDigestAck2VerbHandler.class);
+
+ public void doVerb(Message message)
+ {
+ EndPoint from = message.getFrom();
+ logger_.info("Received a GossipDigestAck2Message from " + from);
+
+ byte[] bytes = (byte[])message.getMessageBody()[0];
+ DataInputStream dis = new DataInputStream( new ByteArrayInputStream(bytes) );
+ try
+ {
+ GossipDigestAck2Message gDigestAck2Message = GossipDigestAck2Message.serializer().deserialize(dis);
+ Map<EndPoint, EndPointState> remoteEpStateMap = gDigestAck2Message.getEndPointStateMap();
+ /* Notify the Failure Detector */
+ Gossiper.instance().notifyFailureDetector(remoteEpStateMap);
+ Gossiper.instance().applyStateLocally(remoteEpStateMap);
+ }
+ catch ( IOException e )
+ {
+ logger_.info( LogUtil.throwableToString(e) );
+ }
+ }
+}
+
Added: incubator/cassandra/src/org/apache/cassandra/gms/HeartBeatState.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/HeartBeatState.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/HeartBeatState.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/HeartBeatState.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * HeartBeat State associated with any given endpoint.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class HeartBeatState
+{
+ private static ICompactSerializer<HeartBeatState> serializer_;
+
+ static
+ {
+ serializer_ = new HeartBeatStateSerializer();
+ }
+
+ int generation_;
+ AtomicInteger heartbeat_;
+ int version_;
+
+ HeartBeatState()
+ {
+ }
+
+ HeartBeatState(int generation, int heartbeat)
+ {
+ this(generation, heartbeat, 0);
+ }
+
+ HeartBeatState(int generation, int heartbeat, int version)
+ {
+ generation_ = generation;
+ heartbeat_ = new AtomicInteger(heartbeat);
+ version_ = version;
+ }
+
+ public static ICompactSerializer<HeartBeatState> serializer()
+ {
+ return serializer_;
+ }
+
+ int getGeneration()
+ {
+ return generation_;
+ }
+
+ void updateGeneration()
+ {
+ ++generation_;
+ version_ = VersionGenerator.getNextVersion();
+ }
+
+ int getHeartBeat()
+ {
+ return heartbeat_.get();
+ }
+
+ void updateHeartBeat()
+ {
+ heartbeat_.incrementAndGet();
+ version_ = VersionGenerator.getNextVersion();
+ }
+
+ int getHeartBeatVersion()
+ {
+ return version_;
+ }
+};
+
+class HeartBeatStateSerializer implements ICompactSerializer<HeartBeatState>
+{
+ public void serialize(HeartBeatState hbState, DataOutputStream dos) throws IOException
+ {
+ dos.writeInt(hbState.generation_);
+ dos.writeInt(hbState.heartbeat_.get());
+ dos.writeInt(hbState.version_);
+ }
+
+ public HeartBeatState deserialize(DataInputStream dis) throws IOException
+ {
+ return new HeartBeatState(dis.readInt(), dis.readInt(), dis.readInt());
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangePublisher.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+/**
+ * This is implemented by the Gossiper module to publish change events to interested parties.
+ * Interested parties register/unregister interest by invoking the methods of this interface.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointStateChangePublisher
+{
+ /**
+ * Register for interesting state changes.
+ * @param subcriber module which implements the IEndPointStateChangeSubscriber
+ */
+ public void register(IEndPointStateChangeSubscriber subcriber);
+
+ /**
+ * Unregister interest for state changes.
+ * @param subcriber module which implements the IEndPointStateChangeSubscriber
+ */
+ public void unregister(IEndPointStateChangeSubscriber subcriber);
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/IEndPointStateChangeSubscriber.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * This is called by an instance of the IEndPointStateChangePublisher to notify
+ * interested parties about changes in the the state associated with any endpoint.
+ * For instance if node A figures there is a changes in state for an endpoint B
+ * it notifies all interested parties of this change. It is upto to the registered
+ * instance to decide what he does with this change. Not all modules maybe interested
+ * in all state changes.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IEndPointStateChangeSubscriber
+{
+ /**
+ * Use to inform interested parties about the change in the state
+ * for specified endpoint
+ *
+ * @param endpoint endpoint for which the state change occured.
+ * @param epState state that actually changed for the above endpoint.
+ */
+ public void onChange(EndPoint endpoint, EndPointState epState);
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetectionEventListener.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Implemented by the Gossiper to either convict/suspect an endpoint
+ * based on the PHI calculated by the Failure Detector on the inter-arrival
+ * times of the heart beats.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureDetectionEventListener
+{
+ /**
+ * Convict the specified endpoint.
+ * @param ep endpoint to be convicted
+ */
+ public void convict(EndPoint ep);
+
+ /**
+ * Suspect the specified endpoint.
+ * @param ep endpoint to be suspected.
+ */
+ public void suspect(EndPoint ep);
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetector.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetector.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetector.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/IFailureDetector.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * An interface that provides an application with the ability
+ * to query liveness information of a node in the cluster. It
+ * also exposes methods which help an application register callbacks
+ * for notifications of liveness information of nodes.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureDetector
+{
+ /**
+ * Failure Detector's knowledge of whether a node is up or
+ * down.
+ *
+ * @param ep endpoint in question.
+ * @return true if UP and false if DOWN.
+ */
+ public boolean isAlive(EndPoint ep);
+
+ /**
+ * This method is invoked by any entity wanting to interrogate the status of an endpoint.
+ * In our case it would be the Gossiper. The Failure Detector will then calculate Phi and
+ * deem an endpoint as suspicious or alive as explained in the Hayashibara paper.
+ *
+ * param ep endpoint for which we interpret the inter arrival times.
+ */
+ public void intepret(EndPoint ep);
+
+ /**
+ * This method is invoked by the receiver of the heartbeat. In our case it would be
+ * the Gossiper. Gossiper inform the Failure Detector on receipt of a heartbeat. The
+ * FailureDetector will then sample the arrival time as explained in the paper.
+ *
+ * param ep endpoint being reported.
+ */
+ public void report(EndPoint ep);
+
+ /**
+ * Register interest for Failure Detector events.
+ * @param listener implementation of an application provided IFailureDetectionEventListener
+ */
+ public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener);
+
+ /**
+ * Un-register interest for Failure Detector events.
+ * @param listener implementation of an application provided IFailureDetectionEventListener
+ */
+ public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener);
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/IFailureNotification.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/IFailureNotification.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/IFailureNotification.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/IFailureNotification.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import org.apache.cassandra.net.EndPoint;
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public interface IFailureNotification
+{
+ public void suspect(EndPoint ep);
+ public void revive(EndPoint ep);
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/JoinMessage.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/JoinMessage.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/JoinMessage.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/JoinMessage.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.cassandra.io.ICompactSerializer;
+
+
+/**
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class JoinMessage
+{
+ private static ICompactSerializer<JoinMessage> serializer_;
+ static
+ {
+ serializer_ = new JoinMessageSerializer();
+ }
+
+ static ICompactSerializer<JoinMessage> serializer()
+ {
+ return serializer_;
+ }
+
+ String clusterId_;
+
+ JoinMessage(String clusterId)
+ {
+ clusterId_ = clusterId;
+ }
+}
+
+class JoinMessageSerializer implements ICompactSerializer<JoinMessage>
+{
+ public void serialize(JoinMessage joinMessage, DataOutputStream dos) throws IOException
+ {
+ dos.writeUTF(joinMessage.clusterId_);
+ }
+
+ public JoinMessage deserialize(DataInputStream dis) throws IOException
+ {
+ String clusterId = dis.readUTF();
+ return new JoinMessage(clusterId);
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/PureRandom.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/PureRandom.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/PureRandom.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/PureRandom.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.Random;
+
+import org.apache.cassandra.utils.BitSet;
+
+
+
+/**
+ * Implementation of a PureRandomNumber generator. Use this class cautiously. Not
+ * for general purpose use. Currently this is used by the Gossiper to choose a random
+ * endpoint to Gossip to.
+ *
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+class PureRandom extends Random
+{
+ private BitSet bs_ = new BitSet();
+ private int lastUb_;
+
+ PureRandom()
+ {
+ super();
+ }
+
+ public int nextInt(int ub)
+ {
+ if (ub <= 0)
+ throw new IllegalArgumentException("ub must be positive");
+
+ if ( lastUb_ != ub )
+ {
+ bs_.clear();
+ lastUb_ = ub;
+ }
+ else if(bs_.cardinality() == ub)
+ {
+ bs_.clear();
+ }
+
+ int value = super.nextInt(ub);
+ while ( bs_.get(value) )
+ {
+ value = super.nextInt(ub);
+ }
+ bs_.set(value);
+ return value;
+ }
+
+ public static void main(String[] args) throws Throwable
+ {
+ Random pr = new PureRandom();
+ int ubs[] = new int[] { 2, 3, 1, 10, 5, 0};
+
+ for (int ub : ubs)
+ {
+ System.out.println("UB: " + String.valueOf(ub));
+ for (int j = 0; j < 10; j++)
+ {
+ int junk = pr.nextInt(ub);
+ // Do something with junk so JVM doesn't optimize away
+ System.out.println(junk);
+ }
+ }
+ }
+}
Added: incubator/cassandra/src/org/apache/cassandra/gms/VersionGenerator.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/src/org/apache/cassandra/gms/VersionGenerator.java?rev=749205&view=auto
==============================================================================
--- incubator/cassandra/src/org/apache/cassandra/gms/VersionGenerator.java (added)
+++ incubator/cassandra/src/org/apache/cassandra/gms/VersionGenerator.java Mon Mar 2 06:12:46 2009
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.gms;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A unique version number generator for any state that is generated by the
+ * local node.
+ * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
+ */
+
+public class VersionGenerator
+{
+ private static AtomicInteger version_ = new AtomicInteger(0);
+
+ public static int getNextVersion()
+ {
+ return version_.incrementAndGet();
+ }
+}