You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:52 UTC
[28/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PID.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PID.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PID.java
deleted file mode 100644
index 4982720..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PID.java
+++ /dev/null
@@ -1,640 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: FD_PID.java,v 1.8 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Promise;
-import com.gemstone.org.jgroups.util.TimeScheduler;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.net.InetAddress;
-import java.util.Enumeration;
-import java.util.Hashtable;
-import java.util.Properties;
-import java.util.Vector;
-
-
-/**
- * Process-ID based FD protocol. The existence of a process will be tested
- * via the process ID instead of message based pinging. In order to probe a process' existence, the application (or
- * some other protocol layer) has to send down a SET_PID event for the member. The addresses of all members will
- * be associated with their respective PIDs. The PID will be used to probe for the existence of that process.<p>
- * A cache of Addresses and PIDs is maintained in each member, which is adjusted upon reception of view changes.
- * The population of the addr:pid cache is as follows:<br>
- * When a new member joins, it requests the PID cache from the coordinator. Then it broadcasts its own addr:pid
- * association, so all members can update their cache. When a member P is to be pinged by Q, and Q doesn't have
- * P'd PID, Q will broadcast a WHO_HAS_PID message, to which all members who have that entry will respond. The
- * latter case should actually never happen because all members should always have consistent caches. However,
- * it is left in the code as a second line of defense.<p>
- * Note that
- * <em>1. The SET_PID has to be sent down after connecting to a channel !</em><p>
- * <em>2. Note that if a process is shunned and subsequently reconnects, the SET_PID event has to be resent !</em><p>
- * <em>3. This protocol only works for groups whose members are on the same host </em>. 'Host' actually means the
- * same IP address (e.g. for multi-homed systems).
- */
-public class FD_PID extends Protocol {
- Address ping_dest=null; // address of the member we monitor
- int ping_pid=0; // PID of the member we monitor
- Address local_addr=null; // our own address
- int local_pid=0; // PID of this process
- long timeout=3000; // msecs to wait for an are-you-alive msg
- long get_pids_timeout=3000; // msecs to wait for the PID cache from the coordinator
- static/*GemStoneAddition*/ final long get_pids_retry_timeout=500; // msecs to wait until we retry fetching the cache from the coord
- int num_tries=3; // attempts the coord is solicited for PID cache until we give up
- final Vector members=new Vector(); // list of group members (updated on VIEW_CHANGE)
- final Hashtable pids=new Hashtable(); // keys=Addresses, vals=Integer (PIDs)
- boolean own_pid_sent=false; // has own PID been broadcast yet ?
- final Vector pingable_mbrs=new Vector(); // mbrs from which we select ping_dest. possible subset of 'members'
- final Promise get_pids_promise=new Promise(); // used for rendezvous on GET_PIDS and GET_PIDS_RSP
- boolean got_cache_from_coord=false; // was cache already fetched ?
- TimeScheduler timer=null; // timer for recurring task of liveness pinging
- Monitor monitor=null; // object that performs the actual monitoring
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "FD_PID";
- }
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("timeout");
- if(str != null) {
- timeout=Long.parseLong(str);
- props.remove("timeout");
- }
-
- str=props.getProperty("get_pids_timeout");
- if(str != null) {
- get_pids_timeout=Long.parseLong(str);
- props.remove("get_pids_timeout");
- }
-
- str=props.getProperty("num_tries");
- if(str != null) {
- num_tries=Integer.parseInt(str);
- props.remove("num_tries");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.FD_PID_FD_PIDSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- if(stack != null && stack.timer != null)
- timer=stack.timer;
- else {
- if(warn) log.warn("TimeScheduler in protocol stack is null (or protocol stack is null)");
- return;
- }
-
- if(monitor != null && monitor.started == false) {
- monitor=null;
- }
- if(monitor == null) {
- monitor=new Monitor();
- timer.add(monitor, true); // fixed-rate scheduling
- }
- }
-
- @Override // GemStoneAddition
- public void stop() {
- if(monitor != null) {
- monitor.stop();
- monitor=null;
- }
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- Message msg;
- FdHeader hdr=null;
- Object tmphdr;
-
- switch(evt.getType()) {
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
-
- case Event.MSG:
- msg=(Message)evt.getArg();
- tmphdr=msg.getHeader(getName());
- if(tmphdr == null || !(tmphdr instanceof FdHeader))
- break; // message did not originate from FD_PID layer, just pass up
-
- hdr=(FdHeader)msg.removeHeader(getName());
-
- switch(hdr.type) {
-
- case FdHeader.SUSPECT:
- if(hdr.mbr != null) {
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID_SUSPECT_HDR__0, hdr);
- passUp(new Event(Event.SUSPECT, new SuspectMember(msg.getSrc(), hdr.mbr))); // GemStoneAddition SuspectMember struct
- passDown(new Event(Event.SUSPECT, new SuspectMember(msg.getSrc(), hdr.mbr)));
- }
- break;
-
- // If I have the PID for the address 'hdr.mbr', return it. Otherwise look it up in my cache and return it
- case FdHeader.WHO_HAS_PID:
- if(local_addr != null && local_addr.equals(msg.getSrc()))
- return; // don't reply to WHO_HAS bcasts sent by me !
-
- if(hdr.mbr == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.FD_PID_WHO_HAS_PID_HDRMBR_IS_NULL);
- return;
- }
-
- // 1. Try my own address, maybe it's me whose PID is wanted
- if(local_addr != null && local_addr.equals(hdr.mbr) && local_pid > 0) {
- sendIHavePidMessage(msg.getSrc(), hdr.mbr, local_pid); // unicast message to msg.getSrc()
- return;
- }
-
- // 2. If I don't have it, maybe it is in the cache
- if(pids.containsKey(hdr.mbr))
- sendIHavePidMessage(msg.getSrc(), hdr.mbr, ((Integer)pids.get(hdr.mbr)).intValue()); // ucast msg
- break;
-
-
- // Update the cache with the add:pid entry (if on the same host)
- case FdHeader.I_HAVE_PID:
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID_IHAVE_PID__0____1, new Object[] {hdr.mbr, Integer.valueOf(hdr.pid)});
-
- if(hdr.mbr == null || hdr.pid <= 0) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.FD_PID_I_HAVE_PID_HDRMBR_IS_NULL_OR_HDRPID__0);
- return;
- }
-
- if(!sameHost(local_addr, hdr.mbr)) {
- if(log.isErrorEnabled())
- log.error(hdr.mbr + " is not on the same host as I (" +
- local_addr + ", discarding I_HAVE_PID event");
- return;
- }
-
- // if(!pids.containsKey(hdr.mbr))
- pids.put(hdr.mbr, Integer.valueOf(hdr.pid)); // update the cache
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID__0__THE_CACHE_IS__1, new Object[] {local_addr, pids});
-
- if(ping_pid <= 0 && ping_dest != null && pids.containsKey(ping_dest)) {
- ping_pid=((Integer)pids.get(ping_dest)).intValue();
- try {
- start();
- }
- catch(Exception ex) {
- if(warn) log.warn("exception when calling start(): " + ex);
- }
- }
- break;
-
- // Return the cache to the sender of this message
- case FdHeader.GET_PIDS:
- if(hdr.mbr == null) {
-
- if(log.isErrorEnabled()) log.error(ExternalStrings.FD_PID_GET_PIDS_HDRMBR__NULL);
- return;
- }
- hdr=new FdHeader(FdHeader.GET_PIDS_RSP);
- hdr.pids=(Hashtable)pids.clone();
- msg=new Message(hdr.mbr, null, null);
- msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, msg));
- break;
-
- case FdHeader.GET_PIDS_RSP:
- if(hdr.pids == null) {
-
- if(log.isErrorEnabled()) log.error(ExternalStrings.FD_PID_GET_PIDS_RSP_CACHE_IS_NULL);
- return;
- }
- get_pids_promise.setResult(hdr.pids);
- break;
- }
- return;
- }
-
- passUp(evt); // pass up to the layer above us
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- Integer pid;
- Address mbr, tmp_ping_dest;
- View v;
-
-
- switch(evt.getType()) {
-
- case Event.SET_PID:
- // 1. Set the PID for local_addr
- pid=(Integer)evt.getArg();
- if(pid == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.FD_PID_SET_PID_DID_NOT_CONTAIN_A_PID_);
- return;
- }
- local_pid=pid.intValue();
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID_LOCAL_PID_0, local_pid);
- break;
-
- case Event.VIEW_CHANGE:
- synchronized(this) {
- v=(View)evt.getArg();
- members.removeAllElements();
- members.addAll(v.getMembers());
- pingable_mbrs.removeAllElements();
- pingable_mbrs.addAll(members);
- passDown(evt);
-
-
- // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
- if(!got_cache_from_coord) {
- getPidsFromCoordinator();
- got_cache_from_coord=true;
- }
-
-
- // 2. Broadcast my own addr:pid to all members so they can update their cache
- if(!own_pid_sent) {
- if(local_pid > 0) {
- sendIHavePidMessage(null, // send to all members
- local_addr,
- local_pid);
- own_pid_sent=true;
- }
- else
- if(warn) log.warn("[VIEW_CHANGE]: local_pid == 0");
- }
-
- // 3. Remove all entries in 'pids' which are not in the new membership
-// if(members != null) GemStoneAddition (cannot be null))
- {
- for(Enumeration e=pids.keys(); e.hasMoreElements();) {
- mbr=(Address)e.nextElement();
- if(!members.contains(mbr))
- pids.remove(mbr);
- }
- }
- tmp_ping_dest=determinePingDest();
- ping_pid=0;
- if(tmp_ping_dest == null) {
- stop();
- ping_dest=null;
- }
- else {
- ping_dest=tmp_ping_dest;
- try {
- start();
- }
- catch(Exception ex) {
- if(warn) log.warn("exception when calling start(): " + ex);
- }
- }
- }
- break;
-
- default:
- passDown(evt);
- break;
- }
- }
-
-
-
-
-
-
-
- /* ----------------------------------- Private Methods -------------------------------------- */
-
-
-
- /**
- * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_PIDS message
- * to coordinator and wait for GET_PIDS_RSP response. Loop until valid response has been received.
- */
- void getPidsFromCoordinator() {
- Address coord;
- int attempts=num_tries;
- Message msg;
- FdHeader hdr;
- Hashtable result;
-
- get_pids_promise.reset();
- while(attempts > 0) {
- if((coord=determineCoordinator()) != null) {
- if(coord.equals(local_addr)) { // we are the first member --> empty cache
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID_FIRST_MEMBER_CACHE_IS_EMPTY);
- return;
- }
- hdr=new FdHeader(FdHeader.GET_PIDS);
- hdr.mbr=local_addr;
- msg=new Message(coord, null, null);
- msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, msg));
- result=(Hashtable)get_pids_promise.getResult(get_pids_timeout);
- if(result != null) {
- pids.putAll(result); // replace all entries (there should be none !) in pids with the new values
-
- if(log.isInfoEnabled())
- log.info("got cache from " +
- coord + ": cache is " + pids);
- return;
- }
- else {
-
- if(log.isErrorEnabled()) log.error(ExternalStrings.FD_PID_RECEIVED_NULL_CACHE_RETRYING);
- }
- }
-
- try { // GemStoneAddition
- Util.sleep(get_pids_retry_timeout);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- // Exit loop, treat as failure
- break;
- }
- --attempts;
- }
- }
-
-
- void broadcastSuspectMessage(Address suspected_mbr) {
- Message suspect_msg;
- FdHeader hdr;
-
-
- if(log.isInfoEnabled())
- log.info("suspecting " + suspected_mbr +
- " (own address is " + local_addr + ')');
-
- hdr=new FdHeader(FdHeader.SUSPECT);
- hdr.mbr=suspected_mbr;
- suspect_msg=new Message(); // mcast SUSPECT to all members
- suspect_msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, suspect_msg));
- }
-
-
- void broadcastWhoHasPidMessage(Address mbr) {
- Message msg;
- FdHeader hdr;
-
- if(local_addr != null && mbr != null)
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID__0__WHOHAS__1, new Object[] {local_addr, mbr});
-
- msg=new Message(); // bcast msg
- hdr=new FdHeader(FdHeader.WHO_HAS_PID);
- hdr.mbr=mbr;
- msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, msg));
- }
-
-
- /**
- * Sends or broadcasts a I_HAVE_PID response. If 'dst' is null, the reponse will be broadcast, otherwise
- * it will be unicast back to the requester
- */
- void sendIHavePidMessage(Address dst, Address mbr, int pid) {
- Message msg=new Message(dst, null, null);
- FdHeader hdr=new FdHeader(FdHeader.I_HAVE_PID);
- hdr.mbr=mbr;
- hdr.pid=pid;
- msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, msg));
- }
-
-
- /**
- * Set ping_dest and ping_pid. If ping_pid is not known, broadcast a WHO_HAS_PID message.
- */
- Address determinePingDest() {
- Address tmp;
-
- if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
- return null;
- for(int i=0; i < pingable_mbrs.size(); i++) {
- tmp=(Address)pingable_mbrs.elementAt(i);
- if(local_addr.equals(tmp)) {
- if(i + 1 >= pingable_mbrs.size())
- return (Address)pingable_mbrs.elementAt(0);
- else
- return (Address)pingable_mbrs.elementAt(i + 1);
- }
- }
- return null;
- }
-
-
- Address determineCoordinator() {
- return members.size() > 0 ? (Address)members.elementAt(0) : null;
- }
-
-
- /**
- * Checks whether 2 Addresses are on the same host
- */
- boolean sameHost(Address one, Address two) {
- InetAddress a, b;
- String host_a, host_b;
-
- if(one == null || two == null) return false;
- if(!(one instanceof IpAddress) || !(two instanceof IpAddress)) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.FD_PID_ADDRESSES_HAVE_TO_BE_OF_TYPE_IPADDRESS_TO_BE_COMPARED);
- return false;
- }
-
- a=((IpAddress)one).getIpAddress();
- b=((IpAddress)two).getIpAddress();
- if(a == null || b == null) return false;
- host_a=a.getHostAddress();
- host_b=b.getHostAddress();
- return host_a.equals(host_b);
- }
-
-
-
- /* ------------------------------- End of Private Methods ------------------------------------ */
-
-
- public static class FdHeader extends Header {
- static final int SUSPECT=10;
- static final int WHO_HAS_PID=11;
- static final int I_HAVE_PID=12;
- static final int GET_PIDS=13; // sent by joining member to coordinator
- static final int GET_PIDS_RSP=14; // sent by coordinator to joining member in response to GET_PIDS
-
-
- int type=SUSPECT;
- Address mbr=null; // set on SUSPECT (suspected mbr), WHO_HAS_PID (requested mbr), I_HAVE_PID
- int pid=0; // set on I_HAVE_PID
- Hashtable pids=null; // set on GET_PIDS_RSP
-
-
- public FdHeader() {
- } // used for externalization
-
- FdHeader(int type) {
- this.type=type;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append(type2String(type));
- if(mbr != null)
- sb.append(", mbr=" + mbr);
- if(pid > 0)
- sb.append(", pid=" + pid);
- if(pids != null)
- sb.append(", pids=" + pids);
- return sb.toString();
- }
-
-
- public static String type2String(int type) {
- switch(type) {
- case SUSPECT:
- return "SUSPECT";
- case WHO_HAS_PID:
- return "WHO_HAS_PID";
- case I_HAVE_PID:
- return "I_HAVE_PID";
- case GET_PIDS:
- return "GET_PIDS";
- case GET_PIDS_RSP:
- return "GET_PIDS_RSP";
- default:
- return "unknown type (" + type + ')';
- }
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
- out.writeObject(mbr);
- out.writeInt(pid);
- out.writeObject(pids);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readInt();
- mbr=(Address)in.readObject();
- pid=in.readInt();
- pids=(Hashtable)in.readObject();
- }
-
- }
-
-
- /**
- * An instance of this class will be added to the TimeScheduler to be scheduled to be run every timeout
- * msecs. When there is no ping_dest (e.g. only 1 member in the group), this task will be cancelled in
- * TimeScheduler (and re-scheduled if ping_dest becomes available later).
- */
- protected/*GemStoneAddition*/ class Monitor implements TimeScheduler.Task {
- boolean started=true;
-
-
- void stop() {
- started=false;
- }
-
-
- /* -------------------------------------- TimeScheduler.Task Interface -------------------------------- */
-
- public boolean cancelled() {
- return !started;
- }
-
-
- public long nextInterval() {
- return timeout;
- }
-
-
- /**
- * Periodically probe for the destination process identified by ping_dest/ping_pid. Suspect the ping_dest
- * member if /prop/<ping_pid> process does not exist.
- */
- public void run() {
- if(ping_dest == null) {
- if(warn) log.warn("ping_dest is null, skipping ping");
- return;
- }
-
-
- if(log.isInfoEnabled())
- log.info("ping_dest=" + ping_dest + ", ping_pid=" + ping_pid +
- ", cache=" + pids);
-
- // If the PID for ping_dest is not known, send a broadcast to solicit it
- if(ping_pid <= 0) {
- if(ping_dest != null && pids.containsKey(ping_dest)) {
- ping_pid=((Integer)pids.get(ping_dest)).intValue();
-
- if(log.isInfoEnabled())
- log.info("found PID for " +
- ping_dest + " in cache (pid=" + ping_pid + ')');
- }
- else {
-
- if(log.isErrorEnabled())
- log.error("PID for " + ping_dest + " not known" +
- ", cache is " + pids);
- broadcastWhoHasPidMessage(ping_dest);
- return;
- }
- }
-
- if(!Util.fileExists("/proc/" + ping_pid)) {
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID_PROCESS__0__DOES_NOT_EXIST, ping_pid);
- broadcastSuspectMessage(ping_dest);
- pingable_mbrs.removeElement(ping_dest);
- ping_dest=determinePingDest();
- if(ping_dest == null)
- stop();
- ping_pid=0;
- }
- else {
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PID_0__IS_ALIVE, ping_dest);
- }
- }
-
- /* ---------------------------------- End of TimeScheduler.Task Interface ---------------------------- */
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PROB.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PROB.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PROB.java
deleted file mode 100644
index b349a54..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_PROB.java
+++ /dev/null
@@ -1,628 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: FD_PROB.java,v 1.9 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.Enumeration;
-import java.util.Hashtable;
-import java.util.Properties;
-import java.util.Vector;
-
-
-/**
- * Probabilistic failure detection protocol based on "A Gossip-Style Failure Detection Service"
- * by Renesse, Minsky and Hayden.<p>
- * Each member maintains a list of all other members: for each member P, 2 data are maintained, a heartbeat
- * counter and the time of the last increment of the counter. Each member periodically sends its own heartbeat
- * counter list to a randomly chosen member Q. Q updates its own heartbeat counter list and the associated
- * time (if counter was incremented). Each member periodically increments its own counter. If, when sending
- * its heartbeat counter list, a member P detects that another member Q's heartbeat counter was not incremented
- * for timeout seconds, Q will be suspected.<p>
- * This protocol can be used both with a PBCAST *and* regular stacks.
- * @author Bela Ban 1999
- * @version $Revision: 1.9 $
- */
-public class FD_PROB extends Protocol implements Runnable {
- Address local_addr=null;
- Thread hb=null; // GemStoneAddition synchronize on this to access
- long timeout=3000; // before a member with a non updated timestamp is suspected
- long gossip_interval=1000;
- Vector members=null;
- final Hashtable counters=new Hashtable(); // keys=Addresses, vals=FdEntries
- final Hashtable invalid_pingers=new Hashtable(); // keys=Address, vals=Integer (number of pings from suspected mbrs)
- int max_tries=2; // number of times to send a are-you-alive msg (tot time= max_tries*timeout)
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "FD_PROB";
- }
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("timeout");
- if(str != null) {
- timeout=Long.parseLong(str);
- props.remove("timeout");
- }
-
- str=props.getProperty("gossip_interval");
- if(str != null) {
- gossip_interval=Long.parseLong(str);
- props.remove("gossip_interval");
- }
-
- str=props.getProperty("max_tries");
- if(str != null) {
- max_tries=Integer.parseInt(str);
- props.remove("max_tries");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.FD_PROB_FD_PROBSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- synchronized (this) { // GemStoneAddition
- if(hb == null) {
- hb=new Thread(this, "FD_PROB.HeartbeatThread");
- hb.setDaemon(true);
- hb.start();
- }
- }
- }
-
-
- @Override // GemStoneAddition
- public void stop() {
- Thread tmp=null;
- synchronized (this) { // GemStoneAddition
- tmp = hb;
- hb = null;
- }
- if(tmp != null && tmp.isAlive()) {
-// tmp=hb;
-// hb=null; GemStoneAddition
- tmp.interrupt();
- try {
- tmp.join(timeout);
- }
- catch(InterruptedException ex) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- // just propagate to caller
- }
- }
- hb=null;
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- Message msg;
-// Address hb_sender; GemStoneAddition
- FdHeader hdr=null;
- Object obj;
-
- switch(evt.getType()) {
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address) evt.getArg();
- break;
-
- case Event.MSG:
- msg=(Message) evt.getArg();
- obj=msg.getHeader(getName());
- if(obj == null || !(obj instanceof FdHeader)) {
- updateCounter(msg.getSrc()); // got a msg from this guy, reset its time (we heard from it now)
- break;
- }
-
- hdr=(FdHeader) msg.removeHeader(getName());
- switch(hdr.type) {
- case FdHeader.HEARTBEAT: // heartbeat request; send heartbeat ack
- if(checkPingerValidity(msg.getSrc()) == false) // false == sender of heartbeat is not a member
- return;
-
- // 2. Update my own array of counters
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PROB__HEARTBEAT_FROM__0, msg.getSrc());
- updateCounters(hdr);
- return; // don't pass up !
- case FdHeader.NOT_MEMBER:
- if(warn) log.warn("NOT_MEMBER: I'm being shunned; exiting");
- passUp(new Event(Event.EXIT));
- return;
- default:
- if(warn) log.warn("FdHeader type " + hdr.type + " not known");
- return;
- }
- }
- passUp(evt); // pass up to the layer above us
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
-// Message msg; GemStoneAddition
- int num_mbrs;
- Vector excluded_mbrs;
- FdEntry entry;
- Address mbr;
-
- switch(evt.getType()) {
-
- // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2
- case Event.VIEW_CHANGE:
- passDown(evt);
- synchronized(this) {
- View v=(View) evt.getArg();
-
- // mark excluded members
- excluded_mbrs=computeExcludedMembers(members, v.getMembers());
- if(excluded_mbrs != null && excluded_mbrs.size() > 0) {
- for(int i=0; i < excluded_mbrs.size(); i++) {
- mbr=(Address) excluded_mbrs.elementAt(i);
- entry=(FdEntry) counters.get(mbr);
- if(entry != null)
- entry.setExcluded(true);
- }
- }
-
- members=v.getMembers(); // GemStoneAddition (cannot be null) v != null ? v.getMembers() : null;
- if(members != null) {
- num_mbrs=members.size();
- if(num_mbrs >= 2) {
- if(hb == null) {
- try {
- start();
- }
- catch(Exception ex) {
- if(warn) log.warn("exception when calling start(): " + ex);
- }
- }
- }
- else
- stop();
- }
- }
- break;
-
- default:
- passDown(evt);
- break;
- }
- }
-
-
- /**
- Loop while more than 1 member available. Choose a member randomly (not myself !) and send a
- heartbeat. Wait for ack. If ack not received withing timeout, mcast SUSPECT message.
- */
- public void run() {
- Message hb_msg;
- FdHeader hdr;
- Address hb_dest, key;
- FdEntry entry;
- long curr_time, diff;
-
-
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PROB_HEARTBEAT_THREAD_WAS_STARTED);
-
- for (;;) { // GemStoneAddition remove coding anti-pattern
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition -- for safety
-
- // 1. Get a random member P (excluding ourself)
- hb_dest=getHeartbeatDest();
- if(hb_dest == null) {
- if(warn) log.warn("hb_dest is null");
- try { // GemStoneAddition
- Util.sleep(gossip_interval);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break; // exit the loop (and the thread)
- }
- continue;
- }
-
-
- // 2. Increment own counter
- entry=(FdEntry) counters.get(local_addr);
- if(entry == null) {
- entry=new FdEntry();
- counters.put(local_addr, entry);
- }
- entry.incrementCounter();
-
-
- // 3. Send heartbeat to P
- hdr=createHeader();
- if(hdr == null)
- if(warn) log.warn("header could not be created. Heartbeat will not be sent");
- else {
- hb_msg=new Message(hb_dest, null, null);
- hb_msg.putHeader(getName(), hdr);
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PROB__HEARTBEAT_TO__0, hb_dest);
- passDown(new Event(Event.MSG, hb_msg));
- }
-
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PROB_OWN_COUNTERS_ARE__0, printCounters());
-
-
- // 4. Suspect members from which we haven't heard for timeout msecs
- for(Enumeration e=counters.keys(); e.hasMoreElements();) {
- curr_time=System.currentTimeMillis();
- key=(Address) e.nextElement();
- entry=(FdEntry) counters.get(key);
-
- if(entry.getTimestamp() > 0 && (diff=curr_time - entry.getTimestamp()) >= timeout) {
- if(entry.excluded()) {
- if(diff >= 2 * timeout) { // remove members marked as 'excluded' after 2*timeout msecs
- counters.remove(key);
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PROB_REMOVED__0, key);
- }
- }
- else {
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PROB_SUSPECTING__0, key);
- passUp(new Event(Event.SUSPECT, new SuspectMember(this.local_addr, key))); // GemStoneAddition suspectmember struct
- }
- }
- }
- try { // GemStoneAddition
- Util.sleep(gossip_interval);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break; // exit the loop (and the thread)
- }
- } // end while
-
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_PROB_HEARTBEAT_THREAD_WAS_STOPPED);
- }
-
-
-
-
-
-
-
- /* -------------------------------- Private Methods ------------------------------- */
-
- Address getHeartbeatDest() {
- Address retval=null;
- int r, size;
- Vector members_copy;
-
- if(members == null || members.size() < 2 || local_addr == null)
- return null;
- members_copy=(Vector) members.clone();
- members_copy.removeElement(local_addr); // don't select myself as heartbeat destination
- size=members_copy.size();
- r=((int) (Math.random() * (size + 1))) % size;
- retval=(Address) members_copy.elementAt(r);
- return retval;
- }
-
-
- /** Create a header containing the counters for all members */
- FdHeader createHeader() {
- int num_mbrs=counters.size(), index=0;
- FdHeader ret=null;
- Address key;
- FdEntry entry;
-
- if(num_mbrs <= 0)
- return null;
- ret=new FdHeader(FdHeader.HEARTBEAT, num_mbrs);
- for(Enumeration e=counters.keys(); e.hasMoreElements();) {
- key=(Address) e.nextElement();
- entry=(FdEntry) counters.get(key);
- if(entry.excluded())
- continue;
- if(index >= ret.members.length) {
- if(warn) log.warn("index " + index + " is out of bounds (" +
- ret.members.length + ')');
- break;
- }
- ret.members[index]=key;
- ret.counters[index]=entry.getCounter();
- index++;
- }
- return ret;
- }
-
-
- /** Set my own counters values to max(own-counter, counter) */
- void updateCounters(FdHeader hdr) {
- Address key;
-// long counter; GemStoneAddition
- FdEntry entry;
-
- if(hdr == null || hdr.members == null || hdr.counters == null) {
- if(warn) log.warn("hdr is null or contains no counters");
- return;
- }
-
- for(int i=0; i < hdr.members.length; i++) {
- key=hdr.members[i];
- if(key == null) continue;
- entry=(FdEntry) counters.get(key);
- if(entry == null) {
- entry=new FdEntry(hdr.counters[i]);
- counters.put(key, entry);
- continue;
- }
-
- if(entry.excluded())
- continue;
-
- // only update counter (and adjust timestamp) if new counter is greater then old one
- entry.setCounter(Math.max(entry.getCounter(), hdr.counters[i]));
- }
- }
-
-
- /** Resets the counter for mbr */
- void updateCounter(Address mbr) {
- FdEntry entry;
-
- if(mbr == null) return;
- entry=(FdEntry) counters.get(mbr);
- if(entry != null)
- entry.setTimestamp();
- }
-
-
- String printCounters() {
- StringBuffer sb=new StringBuffer();
- Address mbr;
- FdEntry entry;
-
- for(Enumeration e=counters.keys(); e.hasMoreElements();) {
- mbr=(Address) e.nextElement();
- entry=(FdEntry) counters.get(mbr);
- sb.append("\n" + mbr + ": " + entry._toString());
- }
- return sb.toString();
- }
-
-
- static Vector computeExcludedMembers(Vector old_mbrship, Vector new_mbrship) {
- Vector ret=new Vector();
- if(old_mbrship == null || new_mbrship == null) return ret;
- for(int i=0; i < old_mbrship.size(); i++)
- if(!new_mbrship.contains(old_mbrship.elementAt(i)))
- ret.addElement(old_mbrship.elementAt(i));
- return ret;
- }
-
-
- /** If hb_sender is not a member, send a SUSPECT to sender (after n pings received) */
- boolean checkPingerValidity(Object hb_sender) {
- int num_pings=0;
- Message shun_msg;
- Header hdr;
-
- if(hb_sender != null && members != null && !members.contains(hb_sender)) {
- if(invalid_pingers.containsKey(hb_sender)) {
- num_pings=((Integer) invalid_pingers.get(hb_sender)).intValue();
- if(num_pings >= max_tries) {
- if(log.isErrorEnabled()) log.error("sender " + hb_sender +
- " is not member in " + members + " ! Telling it to leave group");
- shun_msg=new Message((Address) hb_sender, null, null);
- hdr=new FdHeader(FdHeader.NOT_MEMBER);
- shun_msg.putHeader(getName(), hdr);
- passDown(new Event(Event.MSG, shun_msg));
- invalid_pingers.remove(hb_sender);
- }
- else {
- num_pings++;
- invalid_pingers.put(hb_sender, Integer.valueOf(num_pings));
- }
- }
- else {
- num_pings++;
- invalid_pingers.put(hb_sender, Integer.valueOf(num_pings));
- }
- return false;
- }
- else
- return true;
- }
-
-
- /* ----------------------------- End of Private Methods --------------------------- */
-
-
-
-
-
-
- public static class FdHeader extends Header {
- static final int HEARTBEAT=1; // sent periodically to a random member
- static final int NOT_MEMBER=2; // sent to the sender, when it is not a member anymore (shunned)
-
-
- int type=HEARTBEAT;
- Address[] members=null;
- long[] counters=null; // correlates with 'members' (same indexes)
-
-
- public FdHeader() {
- } // used for externalization
-
- FdHeader(int type) {
- this.type=type;
- }
-
- FdHeader(int type, int num_elements) {
- this(type);
- members=new Address[num_elements];
- counters=new long[num_elements];
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- switch(type) {
- case HEARTBEAT:
- return "[FD_PROB: HEARTBEAT]";
- case NOT_MEMBER:
- return "[FD_PROB: NOT_MEMBER]";
- default:
- return "[FD_PROB: unknown type (" + type + ")]";
- }
- }
-
- public String printDetails() {
- StringBuffer sb=new StringBuffer();
- Address mbr;
-// long c; GemStoneAddition
-
- if(members != null && counters != null)
- for(int i=0; i < members.length; i++) {
- mbr=members[i];
- if(mbr == null)
- sb.append("\n<null>");
- else
- sb.append("\n" + mbr);
- sb.append(": " + counters[i]);
- }
- return sb.toString();
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
-
- if(members != null) {
- out.writeInt(members.length);
- out.writeObject(members);
- }
- else
- out.writeInt(0);
-
- if(counters != null) {
- out.writeInt(counters.length);
- for(int i=0; i < counters.length; i++)
- out.writeLong(counters[i]);
- }
- else
- out.writeInt(0);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- int num;
- type=in.readInt();
-
- num=in.readInt();
- if(num == 0)
- members=null;
- else {
- members=(Address[]) in.readObject();
- }
-
- num=in.readInt();
- if(num == 0)
- counters=null;
- else {
- counters=new long[num];
- for(int i=0; i < counters.length; i++)
- counters[i]=in.readLong();
- }
- }
-
-
- }
-
-
- private static class FdEntry {
- private long counter=0; // heartbeat counter
- private long timestamp=0; // last time the counter was incremented
- private boolean excluded=false; // set to true if member was excluded from group
-
-
- FdEntry() {
-
- }
-
- FdEntry(long counter) {
- this.counter=counter;
- timestamp=System.currentTimeMillis();
- }
-
-
- long getCounter() {
- return counter;
- }
-
- long getTimestamp() {
- return timestamp;
- }
-
- boolean excluded() {
- return excluded;
- }
-
-
- synchronized void setCounter(long new_counter) {
- if(new_counter > counter) { // only set time if counter was incremented
- timestamp=System.currentTimeMillis();
- counter=new_counter;
- }
- }
-
- synchronized void incrementCounter() {
- counter++;
- timestamp=System.currentTimeMillis();
- }
-
- synchronized void setTimestamp() {
- timestamp=System.currentTimeMillis();
- }
-
- synchronized void setExcluded(boolean flag) {
- excluded=flag;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- return "counter=" + counter + ", timestamp=" + timestamp + ", excluded=" + excluded;
- }
-
- public String _toString() {
- return "counter=" + counter + ", age=" + (System.currentTimeMillis() - timestamp) +
- ", excluded=" + excluded;
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SIMPLE.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SIMPLE.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SIMPLE.java
deleted file mode 100644
index 2606f67..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SIMPLE.java
+++ /dev/null
@@ -1,357 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: FD_SIMPLE.java,v 1.9 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Promise;
-import com.gemstone.org.jgroups.util.TimeScheduler;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Vector;
-
-
-/**
- * Simple failure detection protocol. Periodically sends a are-you-alive message to a randomly chosen member
- * (excluding itself) and waits for a response. If a response has not been received within timeout msecs, a counter
- * associated with that member will be incremented. If the counter exceeds max_missed_hbs, that member will be
- * suspected. When a message or a heartbeat are received, the counter is reset to 0.
- *
- * @author Bela Ban Aug 2002
- * @version $Revision: 1.9 $
- */
-public class FD_SIMPLE extends Protocol {
- Address local_addr=null;
- TimeScheduler timer=null;
- HeartbeatTask task=null;
- long interval=3000; // interval in msecs between are-you-alive messages
- long timeout=3000; // time (in msecs) to wait for a response to are-you-alive
- final Vector members=new Vector();
- final HashMap counters=new HashMap(); // keys=Addresses, vals=Integer (count)
- int max_missed_hbs=5; // max number of missed responses until a member is suspected
- static final String name="FD_SIMPLE";
-
-
- @Override // GemStoneAddition
- public String getName() {
- return "FD_SIMPLE";
- }
-
- @Override // GemStoneAddition
- public void init() throws Exception {
- timer=stack.timer;
- }
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("timeout");
- if(str != null) {
- timeout=Long.parseLong(str);
- props.remove("timeout");
- }
-
- str=props.getProperty("interval");
- if(str != null) {
- interval=Long.parseLong(str);
- props.remove("interval");
- }
-
- str=props.getProperty("max_missed_hbs");
- if(str != null) {
- max_missed_hbs=Integer.parseInt(str);
- props.remove("max_missed_hbs");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.FD_SIMPLE_FD_SIMPLESETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
-
- @Override // GemStoneAddition
- public void stop() {
- if(task != null) {
- task.stop();
- task=null;
- }
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- Message msg, rsp;
- Address sender;
- FdHeader hdr=null;
-// Object obj; GemStoneAddition
- boolean counter_reset=false;
-
- switch(evt.getType()) {
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
-
- case Event.MSG:
- msg=(Message)evt.getArg();
- sender=msg.getSrc();
- resetCounter(sender);
- counter_reset=true;
-
- hdr=(FdHeader)msg.removeHeader(name);
- if(hdr == null)
- break;
-
- switch(hdr.type) {
- case FdHeader.ARE_YOU_ALIVE: // are-you-alive request, send i-am-alive response
- rsp=new Message(sender, null, null);
- rsp.putHeader(name, new FdHeader(FdHeader.I_AM_ALIVE));
- passDown(new Event(Event.MSG, rsp));
- return; // don't pass up further
-
- case FdHeader.I_AM_ALIVE:
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_SIMPLE_RECEIVED_I_AM_ALIVE_RESPONSE_FROM__0, sender);
- if(task != null)
- task.receivedHeartbeatResponse(sender);
- if(!counter_reset)
- resetCounter(sender);
- return;
-
- default:
- if(warn) log.warn("FdHeader type " + hdr.type + " not known");
- return;
- }
- }
-
- passUp(evt); // pass up to the layer above us
- }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
-// Message msg; GemStoneAddition
-// int num_mbrs; GemStoneAddition
-// Address mbr; GemStoneAddition
- View new_view;
- Address key;
-
- switch(evt.getType()) {
-
- // Start heartbeat thread when we have more than 1 member; stop it when membership drops below 2
- case Event.VIEW_CHANGE:
- new_view=(View)evt.getArg();
- members.clear();
- members.addAll(new_view.getMembers());
- if(new_view.size() > 1) {
- if(task == null) {
- task=new HeartbeatTask();
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_SIMPLE_STARTING_HEARTBEAT_TASK);
- timer.add(task, true);
- }
- }
- else {
- if(task != null) {
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_SIMPLE_STOPPING_HEARTBEAT_TASK);
- task.stop(); // will be removed from TimeScheduler
- task=null;
- }
- }
-
- // remove all keys from 'counters' which are not in this new view
- for(Iterator it=counters.keySet().iterator(); it.hasNext();) {
- key=(Address)it.next();
- if(!members.contains(key)) {
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FD_SIMPLE_REMOVING__0__FROM_COUNTERS, key);
- it.remove();
- }
- }
- }
-
- passDown(evt);
- }
-
-
-
-
-
-
-
-
- /* -------------------------------- Private Methods ------------------------------- */
-
- Address getHeartbeatDest() {
- Address retval=null;
- int r, size;
- Vector members_copy;
-
- if(members == null || members.size() < 2 || local_addr == null)
- return null;
- members_copy=(Vector)members.clone();
- members_copy.removeElement(local_addr); // don't select myself as heartbeat destination
- size=members_copy.size();
- r=((int)(Math.random() * (size + 1))) % size;
- retval=(Address)members_copy.elementAt(r);
- return retval;
- }
-
-
- int incrementCounter(Address mbr) {
- Integer cnt;
- int ret=0;
-
- if(mbr == null) return ret;
- synchronized(counters) {
- cnt=(Integer)counters.get(mbr);
- if(cnt == null) {
- cnt=Integer.valueOf(0);
- counters.put(mbr, cnt);
- }
- else {
- ret=cnt.intValue() + 1;
- counters.put(mbr, Integer.valueOf(ret));
- }
- return ret;
- }
- }
-
-
- void resetCounter(Address mbr) {
- if(mbr == null) return;
-
- synchronized(counters) {
- counters.put(mbr, Integer.valueOf(0));
- }
- }
-
-
- String printCounters() {
- StringBuffer sb=new StringBuffer();
- Address key;
-
- for(Iterator it=counters.keySet().iterator(); it.hasNext();) {
- key=(Address)it.next();
- sb.append(key).append(": ").append(counters.get(key)).append('\n');
- }
- return sb.toString();
- }
-
- /* ----------------------------- End of Private Methods --------------------------- */
-
-
-
-
-
-
- public static class FdHeader extends Header {
- static final int ARE_YOU_ALIVE=1; // sent periodically to a random member
- static final int I_AM_ALIVE=2; // response to above message
-
-
- int type=ARE_YOU_ALIVE;
-
- public FdHeader() {
- } // used for externalization
-
- FdHeader(int type) {
- this.type=type;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- switch(type) {
- case ARE_YOU_ALIVE:
- return "[FD_SIMPLE: ARE_YOU_ALIVE]";
- case I_AM_ALIVE:
- return "[FD_SIMPLE: I_AM_ALIVE]";
- default:
- return "[FD_SIMPLE: unknown type (" + type + ")]";
- }
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readInt();
- }
-
-
- }
-
-
- class HeartbeatTask implements TimeScheduler.Task {
- boolean stopped=false;
- final Promise promise=new Promise();
- Address dest=null;
-
- void stop() {
- stopped=true;
- }
-
- public boolean cancelled() {
- return stopped;
- }
-
- public long nextInterval() {
- return interval;
- }
-
- public void receivedHeartbeatResponse(Address from) {
- if(from != null && dest != null && from.equals(dest))
- promise.setResult(from);
- }
-
- public void run() {
- Message msg;
- int num_missed_hbs=0;
-
- dest=getHeartbeatDest();
- if(dest == null) {
- if(warn) log.warn("heartbeat destination was null, will not send ARE_YOU_ALIVE message");
- return;
- }
-
- if(log.isInfoEnabled())
- log.info("sending ARE_YOU_ALIVE message to " + dest +
- ", counters are\n" + printCounters());
-
- promise.reset();
- msg=new Message(dest, null, null);
- msg.putHeader(name, new FdHeader(FdHeader.ARE_YOU_ALIVE));
- passDown(new Event(Event.MSG, msg));
-
- promise.getResult(timeout);
- num_missed_hbs=incrementCounter(dest);
- if(num_missed_hbs >= max_missed_hbs) {
-
- if(log.isInfoEnabled())
- log.info("missed " + num_missed_hbs + " from " + dest +
- ", suspecting member");
- passUp(new Event(Event.SUSPECT, new SuspectMember(local_addr, dest))); // GemStoneAddition SuspectMember struct
- }
- }
- }
-
-
-}