You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:50 UTC
[26/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new
deleted file mode 100644
index efdcf56..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new
+++ /dev/null
@@ -1,1153 +0,0 @@
-// $Id: FD_SOCK.java,v 1.29 2005/08/11 12:43:47 belaban Exp $
-
-package org.jgroups.protocols;
-
-import org.jgroups.*;
-import org.jgroups.stack.IpAddress;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.*;
-
-import java.io.*;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.List;
-
-
-/**
- * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a
- * server socket and announces its address together with the server socket's address in a multicast. A
- * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below
- * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When
- * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be
- * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between
- * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity
- * will take place between 2 peers as long as they are alive (i.e. have their server sockets open).
- * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore
- * they won't be detected.
- * The FD_SOCK protocol will work for groups where members are on different hosts<p>
- * The costs involved are 2 additional threads: one that
- * monitors the client side of the socket connection (to monitor a peer) and another one that manages the
- * server socket. However, those threads will be idle as long as both peers are running.
- * @author Bela Ban May 29 2001
- */
-public class FD_SOCK extends Protocol implements Runnable {
- long get_cache_timeout=3000; // msecs to wait for the socket cache from the coordinator
- final long get_cache_retry_timeout=500; // msecs to wait until we retry getting the cache from coord
- long suspect_msg_interval=5000; // (BroadcastTask): mcast SUSPECT every 5000 msecs
- int num_tries=3; // attempts coord is solicited for socket cache until we give up
- final Vector members=new Vector(11); // list of group members (updated on VIEW_CHANGE)
- boolean srv_sock_sent=false; // has own socket been broadcast yet ?
- final Vector pingable_mbrs=new Vector(11); // mbrs from which we select ping_dest. may be subset of 'members'
- final Promise get_cache_promise=new Promise(); // used for rendezvous on GET_CACHE and GET_CACHE_RSP
- boolean got_cache_from_coord=false; // was cache already fetched ?
- Address local_addr=null; // our own address
- ServerSocket srv_sock=null; // server socket to which another member connects to monitor me
- InetAddress srv_sock_bind_addr=null; // the NIC on which the ServerSocket should listen
- ServerSocketHandler srv_sock_handler=null; // accepts new connections on srv_sock
- IpAddress srv_sock_addr=null; // pair of server_socket:port
- Address ping_dest=null; // address of the member we monitor
- Socket ping_sock=null; // socket to the member we monitor
- InputStream ping_input=null; // input stream of the socket to the member we monitor
- Thread pinger_thread=null; // listens on ping_sock, suspects member if socket is closed
- final Hashtable cache=new Hashtable(11); // keys=Addresses, vals=IpAddresses (socket:port)
- boolean stopping=false;
-
- /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)
- * picks a random port */
- int start_port=0;
- final Promise ping_addr_promise=new Promise(); // to fetch the ping_addr for ping_dest
- final Object sock_mutex=new Object(); // for access to ping_sock, ping_input
- TimeScheduler timer=null;
- final BroadcastTask bcast_task=new BroadcastTask(); // to transmit SUSPECT message (until view change)
- boolean regular_sock_close=false; // used by interruptPingerThread() when new ping_dest is computed
- int num_suspect_events=0;
- private static final int NORMAL_TEMINATION=9;
- private static final int ABNORMAL_TEMINATION=-1;
- private static final String name="FD_SOCK";
-
- BoundedList suspect_history=new BoundedList(20);
-
-
- public String getName() {
- return name;
- }
-
- public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
- public String getMembers() {return members != null? members.toString() : "null";}
- public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}
- public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}
- public int getNumSuspectEventsGenerated() {return num_suspect_events;}
- public String printSuspectHistory() {
- StringBuffer sb=new StringBuffer();
- for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {
- sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");
- }
- return sb.toString();
- }
-
- public boolean setProperties(Properties props) {
- String str, tmp=null;
-
- super.setProperties(props);
- str=props.getProperty("get_cache_timeout");
- if(str != null) {
- get_cache_timeout=Long.parseLong(str);
- props.remove("get_cache_timeout");
- }
-
- str=props.getProperty("suspect_msg_interval");
- if(str != null) {
- suspect_msg_interval=Long.parseLong(str);
- props.remove("suspect_msg_interval");
- }
-
- str=props.getProperty("num_tries");
- if(str != null) {
- num_tries=Integer.parseInt(str);
- props.remove("num_tries");
- }
-
- str=props.getProperty("start_port");
- if(str != null) {
- start_port=Integer.parseInt(str);
- props.remove("start_port");
- }
-
-
- // PropertyPermission not granted if running in an untrusted environment with JNLP.
- try {tmp=System.getProperty("bind.address");} catch (SecurityException ex){}
- if(tmp != null)
- str=tmp;
- else
- str=props.getProperty("srv_sock_bind_addr");
- if(str != null) {
- try {
- srv_sock_bind_addr=InetAddress.getByName(str);
- }
- catch(UnknownHostException e) {
- log.error("srv_sock_bind_addr " + str + " is invalid", e);
- return false;
- }
- props.remove("srv_sock_bind_addr");
- }
-
- if(props.size() > 0) {
- log.error("FD_SOCK.setProperties(): the following properties are not recognized: " + props);
-
- return false;
- }
- return true;
- }
-
-
- public void init() throws Exception {
- stopping=false;
- srv_sock_handler=new ServerSocketHandler();
- timer=stack != null ? stack.timer : null;
- if(timer == null)
- throw new Exception("FD_SOCK.init(): timer == null");
- }
-
- public void start() throws Exception {
- super.start();
- stopping=false;
- }
-
-
- public void stop() {
- stopping=true;
- bcast_task.removeAll();
- stopPingerThread();
- stopServerSocket();
- }
-
- public void resetStats() {
- super.resetStats();
- num_suspect_events=0;
- suspect_history.removeAll();
- }
-
-
- public void up(Event evt) {
- Message msg;
- FdHeader hdr;
-
- switch(evt.getType()) {
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address) evt.getArg();
- break;
-
- case Event.MSG:
- msg=(Message) evt.getArg();
- hdr=(FdHeader) msg.removeHeader(name);
- if(hdr == null)
- break; // message did not originate from FD_SOCK layer, just pass up
-
- switch(hdr.type) {
-
- case FdHeader.SUSPECT:
- if(hdr.mbrs != null) {
- if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr);
- for(int i=0; i < hdr.mbrs.size(); i++) {
- passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
- passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
- }
- }
- else
- if(warn) log.warn("[SUSPECT]: hdr.mbrs == null");
- break;
-
- // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it
- case FdHeader.WHO_HAS_SOCK:
- if(local_addr != null && local_addr.equals(msg.getSrc()))
- return; // don't reply to WHO_HAS bcasts sent by me !
-
- if(hdr.mbr == null) {
- if(log.isErrorEnabled()) log.error("hdr.mbr is null");
- return;
- }
-
- if(trace) log.trace("who-has-sock " + hdr.mbr);
-
- // 1. Try my own address, maybe it's me whose socket is wanted
- if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) {
- sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr); // unicast message to msg.getSrc()
- return;
- }
-
- // 2. If I don't have it, maybe it is in the cache
- if(cache.containsKey(hdr.mbr))
- sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr)); // ucast msg
- break;
-
-
- // Update the cache with the addr:sock_addr entry (if on the same host)
- case FdHeader.I_HAVE_SOCK:
- if(hdr.mbr == null || hdr.sock_addr == null) {
- if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");
- return;
- }
-
- // if(!cache.containsKey(hdr.mbr))
- cache.put(hdr.mbr, hdr.sock_addr); // update the cache
- if(trace) log.trace("i-have-sock: " + hdr.mbr + " --> " +
- hdr.sock_addr + " (cache is " + cache + ')');
-
- if(ping_dest != null && hdr.mbr.equals(ping_dest))
- ping_addr_promise.setResult(hdr.sock_addr);
- break;
-
- // Return the cache to the sender of this message
- case FdHeader.GET_CACHE:
- if(hdr.mbr == null) {
- if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null");
- return;
- }
- hdr=new FdHeader(FdHeader.GET_CACHE_RSP);
- hdr.cachedAddrs=(Hashtable) cache.clone();
- msg=new Message(hdr.mbr, null, null);
- msg.putHeader(name, hdr);
- passDown(new Event(Event.MSG, msg));
- break;
-
- case FdHeader.GET_CACHE_RSP:
- if(hdr.cachedAddrs == null) {
- if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null");
- return;
- }
- get_cache_promise.setResult(hdr.cachedAddrs);
- break;
- }
- return;
- }
-
- passUp(evt); // pass up to the layer above us
- }
-
-
- public void down(Event evt) {
- Address mbr, tmp_ping_dest;
- View v;
-
- switch(evt.getType()) {
-
- case Event.UNSUSPECT:
- bcast_task.removeSuspectedMember((Address)evt.getArg());
- break;
-
- case Event.CONNECT:
- passDown(evt);
- srv_sock=Util.createServerSocket(srv_sock_bind_addr, start_port); // grab a random unused port above 10000
- srv_sock_addr=new IpAddress(srv_sock_bind_addr, srv_sock.getLocalPort());
- startServerSocket();
- break;
-
-
- case Event.VIEW_CHANGE:
- synchronized(this) {
- v=(View) evt.getArg();
- members.removeAllElements();
- members.addAll(v.getMembers());
- bcast_task.adjustSuspectedMembers(members);
- pingable_mbrs.removeAllElements();
- pingable_mbrs.addAll(members);
- passDown(evt);
- if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members);
- System.out.println("***** NEW VIEW = " + v + " (local=" + local_addr + ")");
-
- // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
-// if(!got_cache_from_coord) {
-// getCacheFromCoordinator();
-// got_cache_from_coord=true;
-// }
-
-
- // 2. Broadcast my own addr:sock to all members so they can update their cache
- if(!srv_sock_sent) {
- if(srv_sock_addr != null) {
- sendIHaveSockMessage(null, // send to all members
- local_addr,
- srv_sock_addr);
- srv_sock_sent=true;
- }
- else
- if(warn) log.warn("(VIEW_CHANGE): srv_sock_addr == null");
- }
-
- // 3. Remove all entries in 'cache' which are not in the new membership
- for(Enumeration e=cache.keys(); e.hasMoreElements();) {
- mbr=(Address) e.nextElement();
- if(!members.contains(mbr))
- cache.remove(mbr);
- }
-
- if(members.size() > 1) {
- if(pinger_thread != null && pinger_thread.isAlive()) {
- tmp_ping_dest=determinePingDest();
- if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) {
- interruptPingerThread(); // allows the thread to use the new socket
- }
- }
- else
- startPingerThread(); // only starts if not yet running
- }
- else {
- ping_dest=null;
- stopPingerThread();
- }
- }
- break;
-
- default:
- passDown(evt);
- break;
- }
- }
-
-
- /**
- * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its
- * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on
- * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,
- * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
- * there are fewer than 2 members).
- */
- public void run() {
- Address tmp_ping_dest;
- IpAddress ping_addr;
- int max_fetch_tries=10; // number of times a socket address is to be requested before giving up
-
- if(trace) log.trace("pinger_thread started"); // +++ remove
-
- // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
- if(!got_cache_from_coord) {
- getCacheFromCoordinator();
- got_cache_from_coord=true;
- }
-
- while(pinger_thread != null && Thread.currentThread().equals(pinger_thread) && !stopping) {
- tmp_ping_dest=determinePingDest(); // gets the neighbor to our right
- if(log.isDebugEnabled())
- log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs);
- if(tmp_ping_dest == null) {
- ping_dest=null;
- pinger_thread=null;
- break;
- }
- ping_dest=tmp_ping_dest;
- ping_addr=fetchPingAddress(ping_dest);
- if(ping_addr == null) {
- if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying");
- if(--max_fetch_tries <= 0)
- break;
- if(!stopping)
- Util.sleep(2000);
- continue;
- }
-
- if(!setupPingSocket(ping_addr)) {
- // covers use cases #7 and #8 in GmsTests.txt
- if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest);
- broadcastSuspectMessage(ping_dest);
- pingable_mbrs.removeElement(ping_dest);
- continue;
- }
-
- if(log.isTraceEnabled())
- log.trace("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache);
-
-
- System.out.println("#### PING_DEST: " + ping_dest + " (self=" + local_addr + ")");
-
- // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception
- try {
- if(ping_input != null) {
- int c=ping_input.read();
- switch(c) {
- case NORMAL_TEMINATION:
- if(log.isDebugEnabled())
- log.debug("peer closed socket normally");
- pinger_thread=null;
- break;
- case ABNORMAL_TEMINATION:
- handleSocketClose(null);
- break;
- default:
- break;
- }
- }
- }
- catch(IOException ex) { // we got here when the peer closed the socket --> suspect peer and then continue
- handleSocketClose(ex);
- }
- catch(Throwable catch_all_the_rest) {
- log.error("exception", catch_all_the_rest);
- }
- }
- if(log.isTraceEnabled()) log.trace("pinger thread terminated");
- pinger_thread=null;
- }
-
-
-
-
- /* ----------------------------------- Private Methods -------------------------------------- */
-
-
- void handleSocketClose(Exception ex) {
- teardownPingSocket(); // make sure we have no leftovers
- if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
- if(log.isDebugEnabled())
- log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')');
- broadcastSuspectMessage(ping_dest);
- pingable_mbrs.removeElement(ping_dest);
- }
- else {
- if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset");
- regular_sock_close=false;
- }
- }
-
-
- void startPingerThread() {
- if(pinger_thread == null || !pinger_thread.isAlive()) {
- System.out.println("**** starting pinger thread");
- pinger_thread=new Thread(this, "FD_SOCK Ping thread");
- pinger_thread.setDaemon(true);
- pinger_thread.start();
- }
- }
-
-
- void stopPingerThread() {
- stopping=true;
- if(pinger_thread != null && pinger_thread.isAlive()) {
- System.out.println("**** stopping pinger thread");
- pinger_thread=null;
- regular_sock_close=true;
- teardownPingSocket();
- ping_addr_promise.reset();
- get_cache_promise.reset();
- }
- pinger_thread=null;
- }
-
-
- /**
- * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1
- * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are
- * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>
- * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this
- * code portable and we don't have to check for OSs.
- * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().
- */
- void interruptPingerThread() {
- if(pinger_thread != null && pinger_thread.isAlive()) {
- regular_sock_close=true;
- teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
- }
- }
-
- void startServerSocket() {
- if(srv_sock_handler != null)
- srv_sock_handler.start(); // won't start if already running
- }
-
- void stopServerSocket() {
- if(srv_sock_handler != null)
- srv_sock_handler.stop();
- }
-
-
- /**
- * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
- */
- boolean setupPingSocket(IpAddress dest) {
- synchronized(sock_mutex) {
- if(dest == null) {
- if(log.isErrorEnabled()) log.error("destination address is null");
- return false;
- }
- try {
- ping_sock=new Socket(dest.getIpAddress(), dest.getPort());
- ping_sock.setSoLinger(true, 1);
- ping_input=ping_sock.getInputStream();
- return true;
- }
- catch(Throwable ex) {
- return false;
- }
- }
- }
-
-
- void teardownPingSocket() {
- synchronized(sock_mutex) {
- if(ping_sock != null) {
- try {
- ping_sock.shutdownInput();
- ping_sock.close();
- }
- catch(Exception ex) {
- }
- ping_sock=null;
- }
- if(ping_input != null) {
- try {
- ping_input.close();
- }
- catch(Exception ex) {
- }
- ping_input=null;
- }
- }
- }
-
-
- /**
- * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
- * to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
- */
- private void getCacheFromCoordinator() {
- Address coord;
- int attempts=num_tries;
- Message msg;
- FdHeader hdr;
- Hashtable result;
-
- get_cache_promise.reset();
- while(attempts > 0 && !stopping) {
- if((coord=determineCoordinator()) != null) {
- if(coord.equals(local_addr)) { // we are the first member --> empty cache
- if(log.isDebugEnabled()) log.debug("first member; cache is empty");
- return;
- }
- hdr=new FdHeader(FdHeader.GET_CACHE);
- hdr.mbr=local_addr;
- msg=new Message(coord, null, null);
- msg.putHeader(name, hdr);
- passDown(new Event(Event.MSG, msg));
- result=(Hashtable) get_cache_promise.getResult(get_cache_timeout);
- if(result != null) {
- cache.putAll(result); // replace all entries (there should be none !) in cache with the new values
- if(trace) log.trace("got cache from " + coord + ": cache is " + cache);
- return;
- }
- else {
- if(log.isErrorEnabled())
- log.error("received null cache; retrying (stopping=" + stopping + "), " +
- "members=" + members + ", pingable_members=" + pingable_mbrs);
- }
- }
-
- Util.sleep(get_cache_retry_timeout);
- --attempts;
- }
- }
-
-
- /**
- * Sends a SUSPECT message to all group members. Only the coordinator (or the next member in line if the coord
- * itself is suspected) will react to this message by installing a new view. To overcome the unreliability
- * of the SUSPECT message (it may be lost because we are not above any retransmission layer), the following scheme
- * is used: after sending the SUSPECT message, it is also added to the broadcast task, which will periodically
- * re-send the SUSPECT until a view is received in which the suspected process is not a member anymore. The reason is
- * that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
- * react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
- */
- void broadcastSuspectMessage(Address suspected_mbr) {
- Message suspect_msg;
- FdHeader hdr;
-
- if(suspected_mbr == null) return;
-
- if(trace) log.trace("suspecting " + suspected_mbr + " (own address is " + local_addr + ')');
-
- // 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
- hdr=new FdHeader(FdHeader.SUSPECT);
- hdr.mbrs=new Vector(1);
- hdr.mbrs.addElement(suspected_mbr);
- suspect_msg=new Message();
- suspect_msg.putHeader(name, hdr);
- passDown(new Event(Event.MSG, suspect_msg));
-
- // 2. Add to broadcast task and start latter (if not yet running). The task will end when
- // suspected members are removed from the membership
- bcast_task.addSuspectedMember(suspected_mbr);
- if(stats) {
- num_suspect_events++;
- suspect_history.add(suspected_mbr);
- }
- }
-
-
- void broadcastWhoHasSockMessage(Address mbr) {
- Message msg;
- FdHeader hdr;
-
- if(local_addr != null && mbr != null)
- if(log.isDebugEnabled()) log.debug("[" + local_addr + "]: who-has " + mbr);
-
- msg=new Message(); // bcast msg
- hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
- hdr.mbr=mbr;
- msg.putHeader(name, hdr);
- passDown(new Event(Event.MSG, msg));
- }
-
-
- /**
- Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
- it will be unicast back to the requester
- */
- void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
- Message msg=new Message(dst, null, null);
- FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK);
- hdr.mbr=mbr;
- hdr.sock_addr=addr;
- msg.putHeader(name, hdr);
-
- if(trace) // +++ remove
- log.trace("hdr=" + hdr);
-
- passDown(new Event(Event.MSG, msg));
- }
-
-
- /**
- Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
- then by multicasting a request to all members.
- */
- private IpAddress fetchPingAddress(Address mbr) {
- IpAddress ret;
- Message ping_addr_req;
- FdHeader hdr;
-
- if(mbr == null) {
- if(log.isErrorEnabled()) log.error("mbr == null");
- return null;
- }
- // 1. Try to get from cache. Add a little delay so that joining mbrs can send their socket address before
- // we ask them to do so
- ret=(IpAddress)cache.get(mbr);
- if(ret != null)
- return ret;
-
- Util.sleep(300);
- if((ret=(IpAddress)cache.get(mbr)) != null)
- return ret;
-
- // 2. Try to get from mbr
- if(stopping)
- return null;
- ping_addr_promise.reset();
- ping_addr_req=new Message(mbr, null, null); // unicast
- hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
- hdr.mbr=mbr;
- ping_addr_req.putHeader(name, hdr);
- passDown(new Event(Event.MSG, ping_addr_req));
- ret=(IpAddress) ping_addr_promise.getResult(3000);
- if(ret != null)
- return ret;
-
- // 3. Try to get from all members
- if(stopping)
- return null;
- ping_addr_req=new Message(null, null, null); // multicast
- hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
- hdr.mbr=mbr;
- ping_addr_req.putHeader(name, hdr);
- passDown(new Event(Event.MSG, ping_addr_req));
- ret=(IpAddress) ping_addr_promise.getResult(3000);
- return ret;
- }
-
-
- Address determinePingDest() {
- Address tmp;
-
- if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
- return null;
- for(int i=0; i < pingable_mbrs.size(); i++) {
- tmp=(Address) pingable_mbrs.elementAt(i);
- if(local_addr.equals(tmp)) {
- if(i + 1 >= pingable_mbrs.size())
- return (Address) pingable_mbrs.elementAt(0);
- else
- return (Address) pingable_mbrs.elementAt(i + 1);
- }
- }
- return null;
- }
-
-
- Address determineCoordinator() {
- return members.size() > 0 ? (Address) members.elementAt(0) : null;
- }
-
-
-
-
-
- /* ------------------------------- End of Private Methods ------------------------------------ */
-
-
- public static class FdHeader extends Header implements Streamable {
- public static final byte SUSPECT=10;
- public static final byte WHO_HAS_SOCK=11;
- public static final byte I_HAVE_SOCK=12;
- public static final byte GET_CACHE=13; // sent by joining member to coordinator
- public static final byte GET_CACHE_RSP=14; // sent by coordinator to joining member in response to GET_CACHE
-
-
- byte type=SUSPECT;
- Address mbr=null; // set on WHO_HAS_SOCK (requested mbr), I_HAVE_SOCK
- IpAddress sock_addr; // set on I_HAVE_SOCK
-
- // Hashtable<Address,IpAddress>
- Hashtable cachedAddrs=null; // set on GET_CACHE_RSP
- Vector mbrs=null; // set on SUSPECT (list of suspected members)
-
-
- public FdHeader() {
- } // used for externalization
-
- public FdHeader(byte type) {
- this.type=type;
- }
-
- public FdHeader(byte type, Address mbr) {
- this.type=type;
- this.mbr=mbr;
- }
-
- public FdHeader(byte type, Vector mbrs) {
- this.type=type;
- this.mbrs=mbrs;
- }
-
- public FdHeader(byte type, Hashtable cachedAddrs) {
- this.type=type;
- this.cachedAddrs=cachedAddrs;
- }
-
-
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append(type2String(type));
- if(mbr != null)
- sb.append(", mbr=").append(mbr);
- if(sock_addr != null)
- sb.append(", sock_addr=").append(sock_addr);
- if(cachedAddrs != null)
- sb.append(", cache=").append(cachedAddrs);
- if(mbrs != null)
- sb.append(", mbrs=").append(mbrs);
- return sb.toString();
- }
-
-
- public static String type2String(byte type) {
- switch(type) {
- case SUSPECT:
- return "SUSPECT";
- case WHO_HAS_SOCK:
- return "WHO_HAS_SOCK";
- case I_HAVE_SOCK:
- return "I_HAVE_SOCK";
- case GET_CACHE:
- return "GET_CACHE";
- case GET_CACHE_RSP:
- return "GET_CACHE_RSP";
- default:
- return "unknown type (" + type + ')';
- }
- }
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeByte(type);
- out.writeObject(mbr);
- out.writeObject(sock_addr);
- out.writeObject(cachedAddrs);
- out.writeObject(mbrs);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readByte();
- mbr=(Address) in.readObject();
- sock_addr=(IpAddress) in.readObject();
- cachedAddrs=(Hashtable) in.readObject();
- mbrs=(Vector) in.readObject();
- }
-
- public long size() {
- long retval=Global.BYTE_SIZE; // type
- retval+=Util.size(mbr);
- retval+=Util.size(sock_addr);
-
- retval+=Global.INT_SIZE; // cachedAddrs size
- Map.Entry entry;
- Address key;
- IpAddress val;
- if(cachedAddrs != null) {
- for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- if((key=(Address)entry.getKey()) != null)
- retval+=Util.size(key);
- retval+=Global.BYTE_SIZE; // presence for val
- if((val=(IpAddress)entry.getValue()) != null)
- retval+=val.size();
- }
- }
-
- retval+=Global.INT_SIZE; // mbrs size
- if(mbrs != null) {
- for(int i=0; i < mbrs.size(); i++) {
- retval+=Util.size((Address)mbrs.elementAt(i));
- }
- }
-
- return retval;
- }
-
- public void writeTo(DataOutputStream out) throws IOException {
- int size;
- out.writeByte(type);
- Util.writeAddress(mbr, out);
- Util.writeStreamable(sock_addr, out);
- size=cachedAddrs != null? cachedAddrs.size() : 0;
- out.writeInt(size);
- if(size > 0) {
- for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
- Map.Entry entry=(Map.Entry)it.next();
- Address key=(Address)entry.getKey();
- IpAddress val=(IpAddress)entry.getValue();
- Util.writeAddress(key, out);
- Util.writeStreamable(val, out);
- }
- }
- size=mbrs != null? mbrs.size() : 0;
- out.writeInt(size);
- if(size > 0) {
- for(Iterator it=mbrs.iterator(); it.hasNext();) {
- Address address=(Address)it.next();
- Util.writeAddress(address, out);
- }
- }
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- int size;
- type=in.readByte();
- mbr=Util.readAddress(in);
- sock_addr=(IpAddress)Util.readStreamable(IpAddress.class, in);
- size=in.readInt();
- if(size > 0) {
- if(cachedAddrs == null)
- cachedAddrs=new Hashtable();
- for(int i=0; i < size; i++) {
- Address key=Util.readAddress(in);
- IpAddress val=(IpAddress)Util.readStreamable(IpAddress.class, in);
- cachedAddrs.put(key, val);
- }
- }
- size=in.readInt();
- if(size > 0) {
- if(mbrs == null)
- mbrs=new Vector();
- for(int i=0; i < size; i++) {
- Address addr=Util.readAddress(in);
- mbrs.add(addr);
- }
- }
- }
-
- }
-
-
- /**
- * Handles the server-side of a client-server socket connection. Waits until a client connects, and then loops
- * until that client closes the connection. Note that there is no new thread spawned for the listening on the
- * client socket, therefore there can only be 1 client connection at the same time. Subsequent clients attempting
- * to create a connection will be blocked until the first client closes its connection. This should not be a problem
- * as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
- */
- private class ServerSocketHandler implements Runnable {
- Thread acceptor=null;
- /** List<ClientConnectionHandler> */
- final List clients=new ArrayList();
-
-
-
- ServerSocketHandler() {
- start();
- }
-
- void start() {
- if(acceptor == null) {
- acceptor=new Thread(this, "ServerSocket acceptor thread");
- acceptor.setDaemon(true);
- acceptor.start();
- }
- }
-
-
- void stop() {
- if(acceptor != null && acceptor.isAlive()) {
- try {
- srv_sock.close(); // this will terminate thread, peer will receive SocketException (socket close)
- }
- catch(Exception ex) {
- }
- }
- synchronized(clients) {
- for(Iterator it=clients.iterator(); it.hasNext();) {
- ClientConnectionHandler handler=(ClientConnectionHandler)it.next();
- handler.stopThread();
- }
- clients.clear();
- }
- acceptor=null;
- }
-
-
- /** Only accepts 1 client connection at a time (saving threads) */
- public void run() {
- Socket client_sock;
- while(acceptor != null && srv_sock != null) {
- try {
- if(trace) // +++ remove
- log.trace("waiting for client connections on " + srv_sock.getInetAddress() + ":" +
- srv_sock.getLocalPort());
- client_sock=srv_sock.accept();
- if(trace) // +++ remove
- log.trace("accepted connection from " + client_sock.getInetAddress() + ':' + client_sock.getPort());
- ClientConnectionHandler client_conn_handler=new ClientConnectionHandler(client_sock, clients);
- synchronized(clients) {
- clients.add(client_conn_handler);
- }
- client_conn_handler.start();
- }
- catch(IOException io_ex2) {
- break;
- }
- }
- acceptor=null;
- }
- }
-
-
-
- /** Handles a client connection; multiple client can connect at the same time */
- private static class ClientConnectionHandler extends Thread {
- Socket client_sock=null;
- InputStream in;
- final Object mutex=new Object();
- List clients=new ArrayList();
-
- ClientConnectionHandler(Socket client_sock, List clients) {
- setName("ClientConnectionHandler");
- setDaemon(true);
- this.client_sock=client_sock;
- this.clients.addAll(clients);
- }
-
- void stopThread() {
- synchronized(mutex) {
- if(client_sock != null) {
- try {
- OutputStream out=client_sock.getOutputStream();
- out.write(NORMAL_TEMINATION);
- }
- catch(Throwable t) {
- }
- }
- }
- closeClientSocket();
- }
-
- void closeClientSocket() {
- synchronized(mutex) {
- if(client_sock != null) {
- try {
- client_sock.close();
- }
- catch(Exception ex) {
- }
- client_sock=null;
- }
- }
- }
-
- public void run() {
- try {
- synchronized(mutex) {
- if(client_sock == null)
- return;
- in=client_sock.getInputStream();
- }
- while((in.read()) != -1) {
- }
- }
- catch(IOException io_ex1) {
- }
- finally {
- closeClientSocket();
- synchronized(clients) {
- clients.remove(this);
- }
- }
- }
- }
-
-
- /**
- * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
- * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
- * sure they are retransmitted until a view has been received which doesn't contain the suspected members
- * any longer. Then the task terminates.
- */
- private class BroadcastTask implements TimeScheduler.Task {
- final Vector suspected_mbrs=new Vector(7);
- boolean stopped=false;
-
-
- /** Adds a suspected member. Starts the task if not yet running */
- public void addSuspectedMember(Address mbr) {
- if(mbr == null) return;
- if(!members.contains(mbr)) return;
- synchronized(suspected_mbrs) {
- if(!suspected_mbrs.contains(mbr)) {
- suspected_mbrs.addElement(mbr);
- if(log.isDebugEnabled()) log.debug("mbr=" + mbr + " (size=" + suspected_mbrs.size() + ')');
- }
- if(stopped && suspected_mbrs.size() > 0) {
- stopped=false;
- timer.add(this, true);
- }
- }
- }
-
-
- public void removeSuspectedMember(Address suspected_mbr) {
- if(suspected_mbr == null) return;
- if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr);
- synchronized(suspected_mbrs) {
- suspected_mbrs.removeElement(suspected_mbr);
- if(suspected_mbrs.size() == 0)
- stopped=true;
- }
- }
-
-
- public void removeAll() {
- synchronized(suspected_mbrs) {
- suspected_mbrs.removeAllElements();
- stopped=true;
- }
- }
-
-
- /**
- * Removes all elements from suspected_mbrs that are <em>not</em> in the new membership
- */
- public void adjustSuspectedMembers(Vector new_mbrship) {
- Address suspected_mbr;
-
- if(new_mbrship == null || new_mbrship.size() == 0) return;
- synchronized(suspected_mbrs) {
- for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
- suspected_mbr=(Address) it.next();
- if(!new_mbrship.contains(suspected_mbr)) {
- it.remove();
- if(log.isDebugEnabled())
- log.debug("removed " + suspected_mbr + " (size=" + suspected_mbrs.size() + ')');
- }
- }
- if(suspected_mbrs.size() == 0)
- stopped=true;
- }
- }
-
-
- public boolean cancelled() {
- return stopped;
- }
-
-
- public long nextInterval() {
- return suspect_msg_interval;
- }
-
-
- public void run() {
- Message suspect_msg;
- FdHeader hdr;
-
- if(log.isDebugEnabled())
- log.debug("broadcasting SUSPECT message (suspected_mbrs=" + suspected_mbrs + ") to group");
-
- synchronized(suspected_mbrs) {
- if(suspected_mbrs.size() == 0) {
- stopped=true;
- if(log.isDebugEnabled()) log.debug("task done (no suspected members)");
- return;
- }
-
- hdr=new FdHeader(FdHeader.SUSPECT);
- hdr.mbrs=(Vector) suspected_mbrs.clone();
- }
- suspect_msg=new Message(); // mcast SUSPECT to all members
- suspect_msg.putHeader(name, hdr);
- passDown(new Event(Event.MSG, suspect_msg));
- if(log.isDebugEnabled()) log.debug("task done");
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java
deleted file mode 100644
index 304d8ab..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.Protocol;
-
-import java.util.Vector;
-
-
-
-
-/**
- * Title: Flow control layer
- * Description: This layer limits the number of sent messages without a receive of an own message to MAXSENTMSGS,
- * just put this layer above GMS and you will get a more
- * Copyright: Copyright (c) 2000
- * Company: Computer Network Laboratory
- * @author Gianluca Collot
- * @version 1.0
- */
-public class FLOWCONTROL extends Protocol {
-
- final Vector queuedMsgs = new Vector();
- int sentMsgs = 0;
- static final int MAXSENTMSGS = 1;
- Address myAddr;
-
-
- public FLOWCONTROL() {
- }
- @Override // GemStoneAddition
- public String getName() {
- return "FLOWCONTROL";
- }
-
- /**
- * Checs if up messages are from myaddr and in the case sends down queued messages or
- * decremnts sentMsgs if there are no queued messages
- */
- @Override // GemStoneAddition
- public void up(Event evt) {
- Message msg;
- switch (evt.getType()) {
- case Event.SET_LOCAL_ADDRESS: myAddr = (Address) evt.getArg();
- break;
-
- case Event.MSG: msg = (Message) evt.getArg();
- if(log.isDebugEnabled()) log.debug("Message received");
- if (msg.getSrc().equals(myAddr)) {
- if (queuedMsgs.size() > 0) {
- if(log.isDebugEnabled()) log.debug("Message from me received - Queue size was " + queuedMsgs.size());
- passDown((Event) queuedMsgs.remove(0));
- } else {
- if(log.isDebugEnabled()) log.debug("Message from me received - No messages in queue");
- sentMsgs--;
- }
- }
- }
- passUp(evt);
- }
-
- /**
- * Checs if it can send the message, else puts the message in the queue.
- */
- @Override // GemStoneAddition
- public void down(Event evt) {
- Message msg;
- if (evt.getType()==Event.MSG) {
- msg = (Message) evt.getArg();
- if ((msg.getDest() == null) || (msg.getDest().equals(myAddr))) {
- if (sentMsgs < MAXSENTMSGS) {
- sentMsgs++;
- if(log.isDebugEnabled()) log.debug("Message " + sentMsgs + " sent");
- } else {
- queuedMsgs.add(evt); //queues message (we add the event to avoid creating a new event to send the message)
- return;
- }
- }
- }
- passDown(evt);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java
deleted file mode 100644
index c16e9c6..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: FLOW_CONTROL.java,v 1.10 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.blocks.GroupRequest;
-import com.gemstone.org.jgroups.stack.MessageProtocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.ReusableThread;
-import com.gemstone.org.jgroups.util.RspList;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
-
-// @todo Handle view changes (e.g., members {A,B,C}, blocked on C, and C crashes --> unblock).
-/**
- * FLOW_CONTROL provides end-end congestion control and flow control.
- * Attempts to maximize through put, by minimizing the
- * possible block times(Forward flow control). Initially, sender starts with a smaller
- * window size <code> W</code> and large expected RTT <code>grpRTT</code>. Sender also
- * keeps a margin in the window size. When the margin is hit, insted of waiting for the
- * window size to be exhausted, sender multicasts a FLOW_CONTROL info request message.
- * If the window size is exhausted before the responses are received, send will be blocked.
- * FCInfo(flow control info) from all the receivers is gathered at the sender, and current RTT
- * is computed. If the current RTT is greater than estimated RTT window size and margin are reduced,
- * otherwise they are increased.
- * <p>
- * Horizontal interaction is initiated by the sender with the other group members.
- * <p>
- * <em>Note: A reliable transport layer is required for this protocol to function properly.</em>
- * With little effort this can be made completely independent.
- * <p>
- * <br> Also block on down() instead of sending BLOCK_SEND.
- *
- * @author Ananda Bollu
- */
-
-public class FLOW_CONTROL extends MessageProtocol implements Runnable {
- private int _numMSGsSentThisPeriod=0;
- private static final String FLOW_CONTROL="FLOW_CONTROL";
- private final HashMap _rcvdMSGCounter=new HashMap();
-
- private int _windowSize=1000;
- private int _fwdMarginSize=200;
- private int _estimatedRTT=100000;
- private boolean waitingForResponse=false;
- private final ReusableThread _reusableThread;
- private double RTT_WEIGHT=0.125;
- private int _msgsSentAfterFCreq=0;
-// private final double TIME_OUT_FACTOR=0.25;//if resp not received from more than n*TIME_OUT_INCREMENT_FACTOR
-// private final double TIME_OUT_INCR_MULT=1.25;
- private double WINDOW_SIZE_REDUCTION=0.75;
- private double WINDOW_SIZE_EXPANSION=1.25;
- private boolean isBlockState=false;
-
- private int _windowsize_cap=1000000; //initial window size can not be more than 10^6 messages.
-
- public FLOW_CONTROL() {
- _reusableThread=new ReusableThread(FLOW_CONTROL);
- }
-
- @Override // GemStoneAddition
- public String getName() {
- return FLOW_CONTROL;
- }
-
- /**
- * If Event.MSG type is received count is incremented by one,
- * and message is passed to the down_prot. At some point,
- * based on the algorithm(FLOW_CONTROL protocol definition)
- * data collection sequence is started. This is done by each
- * member in SENDER role when _numMSGsSentThisPeriod hits the margin.
- * Before rsp arrives only _fwdMarginSize number of messages can be sent,
- * and then sender will be blocked.
- */
- @Override // GemStoneAddition
- public boolean handleDownEvent(Event evt) {
- if(evt.getType() == Event.MSG) {
- _numMSGsSentThisPeriod++;
- if((_numMSGsSentThisPeriod > (_windowSize - _fwdMarginSize)) && !waitingForResponse) {
- waitingForResponse=true;
- //wait for the previous request to return.before assigning a new task.
- _reusableThread.waitUntilDone();
- _reusableThread.assignTask(this);
- }
- if(waitingForResponse) {
- _msgsSentAfterFCreq++;
- if((_msgsSentAfterFCreq >= _fwdMarginSize) && !isBlockState) {
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL_ACTION_BLOCK);
- log.error(ExternalStrings.FLOW_CONTROL_0_0_1, new Object[] {Long.valueOf(System.currentTimeMillis()), Integer.valueOf(_windowSize)});
- passUp(new Event(Event.BLOCK_SEND));
- isBlockState=true;
- }
- }
- }
- return true;
- }
-
- /**
- * If Event.MSG type is received message, number of received
- * messages from the sender is incremented. And the message is
- * passed up the stack.
- */
- @Override // GemStoneAddition
- public boolean handleUpEvent(Event evt) {
- if(evt.getType() == Event.MSG) {
- Message msg=(Message)evt.getArg();
- Address src=msg.getSrc();
- FCInfo fcForSrc=(FCInfo)_rcvdMSGCounter.get(src);
- if(fcForSrc == null) {
- fcForSrc=new FCInfo();
- _rcvdMSGCounter.put(src, fcForSrc);
- }
- fcForSrc.increment(1);
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL_MESSAGE__0__RECEIVED_FROM__1, new Object[] {Integer.valueOf(fcForSrc.getRcvdMSGCount()), src});
- }
- return true;
- }
-
- /**
- * Called when a request for this protocol layer is received.
- * Processes and return value is sent back in the reply.
- * FLOW_CONTROL protocol of all members gets this message(including sender?)
- *
- * @return Object containing FC information for sender with senderID.
- * <b>Callback</b>. Called when a request for this protocol layer is received.
- */
- @Override // GemStoneAddition
- public Object handle(Message req) {
- Address src=req.getSrc();
- Long resp=Long.valueOf(((FCInfo)_rcvdMSGCounter.get(src)).getRcvdMSGCount());
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL_REQEST_CAME_FROM__0__PREPARED_RESPONSE__1, new Object[] {src, resp});
- return resp;
- }
-
- /**
- * FCInfo request must be submitted in a different thread.
- * handleDownEvent() can still be called to send messages
- * while waiting for FCInfo from receivers. usually takes
- * RTT.
- */
- public void run() {
-
- if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL__HIT_THE__FWDMARGIN_REMAINING_SIZE__0, _fwdMarginSize);
- reqFCInfo();
- }
-
- /**
- * Following parameters can be optionally supplied:
- * <ul>
- * <li>window size cap - <code>int</code> Limits the window size to a reasonable value.
- * <li>window size - <code>int</code> these many number of messages are sent before a block could happen
- * <li>forward margin -<code>int</code> a request for flow control information is sent when remaining window size hits this margin
- * <li>RTT weight -<code>double</code> Max RTT in the group is calculated during each Flow control request. lower number assigns
- * higher weight to current RTT in estimating RTT.
- * <li>window size reduction factor -<code>double</code> When current RTT is greater than estimated RTT current window size
- * is reduced by this multiple.
- * <li>window size expansion factor -<code>double</code> When current RTT is less than estimated RTT window is incremented
- * by this multiple.
- * </ul>
- *
- * @see com.gemstone.org.jgroups.stack.Protocol#setProperties(Properties)
- */
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str=null;
- String winsizekey="window_size";
- String fwdmrgnkey="fwd_mrgn";
- String rttweightkey="rttweight";
- String sizereductionkey="reduction";
- String sizeexpansionkey="expansion";
- String windowsizeCapKey="window_size_cap";
-
- super.setProperties(props);
- str=props.getProperty(windowsizeCapKey);
- if(str != null) {
- _windowsize_cap=Integer.parseInt(str);
- props.remove(windowsizeCapKey);
- }
- str=props.getProperty(winsizekey);
- if(str != null) {
- _windowSize=Integer.parseInt(str);
- if(_windowSize > _windowsize_cap)
- _windowSize=_windowsize_cap;
- props.remove(winsizekey);
- }
-
- str=props.getProperty(fwdmrgnkey);
- if(str != null) {
- _fwdMarginSize=Integer.parseInt(str);
- props.remove(fwdmrgnkey);
- }
-
- str=props.getProperty(rttweightkey);
- if(str != null) {
- RTT_WEIGHT=Double.parseDouble(str);
- props.remove(rttweightkey);
- }
-
- str=props.getProperty(sizereductionkey);
- if(str != null) {
- WINDOW_SIZE_REDUCTION=Double.parseDouble(str);
- props.remove(sizereductionkey);
- }
-
- str=props.getProperty(sizeexpansionkey);
- if(str != null) {
- WINDOW_SIZE_EXPANSION=Double.parseDouble(str);
- props.remove(sizeexpansionkey);
- }
-
-
- if(props.size() > 0) {
- log.error(ExternalStrings.FLOW_CONTROL_FLOW_CONTROLSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
-
- }
-
- /*-----------private stuff ------*/
-
- private RspList reqFCInfo() {
- RspList rspList=null;
- long reqSentTime=0, rspRcvdTime=0;
- try {
- reqSentTime=System.currentTimeMillis();
- //alternatively use _estimatedRTT for timeout.(timeout is the right way, but need to
- //check the use cases.
- rspList=castMessage(null, new Message(null, null, Util.objectToByteBuffer(FLOW_CONTROL)),
- GroupRequest.GET_ALL, 0);
- rspRcvdTime=System.currentTimeMillis();
- }
- catch(Exception ex) {
- ex.printStackTrace();
- }
-
- /*If NAKACK layer is present, if n+1 th message is FLOW_CONTROL Request, if responses are received
- that means all n messages sent earlier are received(?), ignore NAK_ACK.
- */
- //ANALYSE RESPONSES
-
- long currentRTT=rspRcvdTime - reqSentTime;
-
- if(currentRTT > _estimatedRTT) {
- _windowSize=(int)(_windowSize * WINDOW_SIZE_REDUCTION);
- _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_REDUCTION);
- }
- else {
- _windowSize=(int)(_windowSize * WINDOW_SIZE_EXPANSION);
- if(_windowSize > _windowsize_cap)
- _windowSize=_windowsize_cap;
- _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_EXPANSION);
- }
-
- _estimatedRTT=(int)((RTT_WEIGHT * currentRTT) + (1.0 - RTT_WEIGHT) * _estimatedRTT);
-
- //reset for new FLOW_CONTROL request period.
- _numMSGsSentThisPeriod=0;
- waitingForResponse=false;
- _msgsSentAfterFCreq=0;
-
- if(isBlockState) {
-
- if(warn) log.warn("ACTION UNBLOCK");
- passUp(new Event(Event.UNBLOCK_SEND));
- log.error(ExternalStrings.FLOW_CONTROL_1_0_1, new Object[] {Long.valueOf(System.currentTimeMillis()), Integer.valueOf(_windowSize)});
- isBlockState=false;
- }
-
-
- if(warn) log.warn("estimatedTimeout = " + _estimatedRTT);
- if(warn) log.warn("window size = " + _windowSize + " forward margin size = " + _fwdMarginSize);
-
- return rspList;
- }
-
-
- /* use this instead of Integer. */
- private static class FCInfo implements Serializable {
- int _curValue;
- private static final long serialVersionUID = -8365016426836017979L;
-
- FCInfo() {
- }
-
- public void increment(int i) {
- _curValue+=i;
- }
-
- public int getRcvdMSGCount() {
- return _curValue;
- }
-
- @Override // GemStoneAddition
- public String toString() {
- return Integer.toString(_curValue);
- }
- }
-
-
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java
deleted file mode 100644
index a5fa5ec..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: FLUSH.java,v 1.10 2005/08/11 12:43:47 belaban Exp $
-
-
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.blocks.GroupRequest;
-import com.gemstone.org.jgroups.blocks.MethodCall;
-import com.gemstone.org.jgroups.stack.RpcProtocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.List;
-import com.gemstone.org.jgroups.util.Rsp;
-import com.gemstone.org.jgroups.util.RspList;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-
-/**
- The task of the FLUSH protocol is to flush all pending messages out of the system. This is
- done before a view change by stopping all senders and then agreeing on what messages
- should be delivered in the current view (before switching to the new view). A coordinator
- broadcasts a FLUSH message. The message contains an array of the highest sequence number for each member
- as seen by the coordinator so far. Each member responds with its highest sequence numbers seen so far (for
- each member): if its sequence number for a member P is higher than the one sent by the coordinator, it
- will append the messages apparently not received by the coordinator to its reply. The coordinator (when
- all replies have been received), computes for each member the lowest and highest sequence number and
- re-broadcasts messages accordingly (using ACKs rather then NAKs to ensure reliable delivery).<p> Example:
- <pre>
-
- FLUSH ---> (p=10, q=22, r=7)
-
- <-- (p=10, q=20, r=7) (did not receive 2 messages from q)
- <-- (p=12, q=23, r=7) (attached are messages p11, p12, and q23)
- <-- (p=10, q=22, r=8) (attached is message r8)
- ---------------------
- min: 11 21 8
- max: 12 23 8
- </pre>
-
- The coordinator now computes the range for each member and re-broadcasts messages
- p11, p12, q21, q22, q23 and r8.
- This is essentially the exclusive min and inclusive max of all replies. Note that messages p11, p12 and q23
- were not received by the coordinator itself before. They were only returned as result of the FLUSH replies
- and the coordinator now re-broadcasts them.
-
-*/
-public class FLUSH extends RpcProtocol {
- final Vector mbrs=new Vector();
- boolean is_server=false;
- final Object block_mutex=new Object();
- long block_timeout=5000;
- Address local_addr=null;
- boolean blocked=false; // BLOCK: true, VIEW_CHANGE: false
- final Object digest_mutex=new Object();
- long digest_timeout=2000; // time to wait for retrieval of unstable msgs
-
- final Object highest_delivered_mutex=new Object();
- long[] highest_delivered_msgs;
-
- Digest digest=null;
-
- final Object get_msgs_mutex=new Object();
- static/*GemStoneAddition*/ final long get_msgs_timeout=4000;
- List get_msgs=null;
-
-
-
- @Override // GemStoneAddition
- public String getName() {return "FLUSH";}
-
-
- @Override // GemStoneAddition
- public Vector providedUpServices() {
- Vector retval=new Vector();
- retval.addElement(Integer.valueOf(Event.FLUSH));
- return retval;
- }
-
- @Override // GemStoneAddition
- public Vector requiredDownServices() {
- Vector retval=new Vector();
- retval.addElement(Integer.valueOf(Event.GET_MSGS_RECEIVED)); // NAKACK layer
- retval.addElement(Integer.valueOf(Event.GET_MSG_DIGEST)); // NAKACK layer
- retval.addElement(Integer.valueOf(Event.GET_MSGS)); // NAKACK layer
- return retval;
- }
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- super.start();
- if(_corr != null) {
- _corr.setDeadlockDetection(true);
- }
- else
- throw new Exception("FLUSH.start(): cannot set deadlock detection in corr, as it is null !");
- }
-
-
- /**
- Triggered by reception of FLUSH event from GMS layer (must be coordinator). Calls
- <code>HandleFlush</code> in all members and returns FLUSH_OK event.
- @param dests A list of members to which the FLUSH is to be sent
- @return FlushRsp Contains result (true or false), list of unstable messages and list of members
- failed during the FLUSH.
- */
- private FlushRsp flush(Vector dests) {
- RspList rsp_list;
- FlushRsp retval=new FlushRsp();
- Digest digest;
- long[] min, max;
- long[] lower[];
- List unstable_msgs=new List();
- boolean get_lower_msgs=false;
-
- highest_delivered_msgs=new long[members.size()];
- min=new long[members.size()];
- max=new long[members.size()];
-
-
- /* Determine the highest seqno (for each member) that was delivered to the application
- (i.e., consumed by the application). Stores result in array 'highest_delivered_msgs' */
- getHighestDeliveredSeqnos();
-
- for(int i=0; i < highest_delivered_msgs.length; i++)
- min[i]=max[i]=highest_delivered_msgs[i];
-
-
- /* Call the handleFlush() method of all existing members. The highest seqnos seen by the coord
- is the argument */
- if(log.isInfoEnabled()) log.info(ExternalStrings.FLUSH_CALLING_HANDLEFLUSH_0, dests);
- passDown(new Event(Event.SWITCH_OUT_OF_BAND)); // we need out-of-band control for FLUSH ...
- MethodCall call = new MethodCall("handleFlush", new Object[] {dests, highest_delivered_msgs.clone()},
- new String[] {Vector.class.getName(), long[].class.getName()});
- rsp_list=callRemoteMethods(dests, call, GroupRequest.GET_ALL, 0);
- if(log.isInfoEnabled()) log.info(ExternalStrings.FLUSH_FLUSH_DONE);
-
-
- /* Process all the responses (Digest): compute a range of messages (min and max seqno) for each
- member that has to be re-broadcast; FlushRsp contains those messages. They will be re-braodcast
- by the cordinator (in the GMS protocol). */
- for(int i=0; i < rsp_list.size(); i++) {
- Rsp rsp=(Rsp)rsp_list.elementAt(i);
- if(rsp.wasReceived()) {
- digest=(Digest)rsp.getValue();
- if(digest != null) {
- for(int j=0; j < digest.highest_seqnos.length && j < min.length; j++) {
- min[j]=Math.min(min[j], digest.highest_seqnos[j]);
- max[j]=Math.max(max[j], digest.highest_seqnos[j]);
- }
- if(digest.msgs.size() > 0) {
- for(Enumeration e=digest.msgs.elements(); e.hasMoreElements();)
- unstable_msgs.add(e.nextElement());
- }
- }
- }
- } // end for-loop
-
-
-
- /* If any of the highest msgs of the flush replies were lower than the ones sent by this
- coordinator, we have to re-broadcast them. (This won't occur often)
- Compute the range between min and highest_delivered_msgs */
- lower=new long[min.length][]; // stores (for each mbr) the range of seqnos (e.g. 20 24): send msgs
- // 21, 22 and 23 and 24 (excluding lower and including upper range)
-
- for(int i=0; i < min.length; i++) {
- if(min[i] < highest_delivered_msgs[i]) { // will almost never be the case
- lower[i]=new long[2];
- lower[i][0]=min[i]; // lower boundary (excluding)
- lower[i][1]=highest_delivered_msgs[i]; // upper boundary (including)
- get_lower_msgs=true;
- }
- }
- if(get_lower_msgs) {
- get_msgs=null;
- synchronized(get_msgs_mutex) {
- passDown(new Event(Event.GET_MSGS, lower));
- try {
- get_msgs_mutex.wait(get_msgs_timeout);
- }
- catch(InterruptedException e) { // GemStoneAddition
- Thread.currentThread().interrupt();
- // There's no looping or anything in the rest of this
- // method, so we just propagate the bit and finish up...
- }
- }
- if(get_msgs != null) {
- for(Enumeration e=get_msgs.elements(); e.hasMoreElements();)
- unstable_msgs.add(e.nextElement());
- }
- }
- retval.unstable_msgs=unstable_msgs.getContents();
- if(rsp_list.numSuspectedMembers() > 0) {
- retval.result=false;
- retval.failed_mbrs=rsp_list.getSuspectedMembers();
- }
-
- return retval;
- }
-
-
-
-
-
- /**
- Called by coordinator running the FLUSH protocol. Argument is an array of the highest seqnos as seen
- by the coordinator (for each member). <code>handleFlush()</code> checks for each member its
- own highest seqno seen for that member. If it is higher than the one seen by the coordinator,
- all higher messages are attached to the return value (a message digest).
- @param flush_dests The members to which this message is sent. Processes not in this list just
- ignore the handleFlush() message.
- @param highest_seqnos The highest sequence numbers (order corresponding to membership) as seen
- by coordinator.
- @return Digest An array of the highest seqnos for each member, as seen by this member. If this
- member's seqno for a member P is higher than the one in <code>highest_seqnos</code>,
- the missing messages are added to the message digest as well. This allows the
- coordinator to re-broadcast missing messages.
- */
- public synchronized Digest handleFlush(Vector flush_dests, long[] highest_seqnos) {
- digest=null;
-
- if(log.isInfoEnabled()) log.info("flush_dests=" + flush_dests +
- " , highest_seqnos=" + Util.array2String(highest_seqnos));
-
- if(!is_server) // don't handle the FLUSH if not yet joined to the group
- return digest;
-
- if(flush_dests == null) {
- if(warn) log.warn("flush dest is null, ignoring flush !");
- return digest;
- }
-
- if(flush_dests.size() == 0) {
- if(warn) log.warn("flush dest is empty, ignoring flush !");
- return digest;
- }
-
- if(!flush_dests.contains(local_addr)) {
-
- if(warn) log.warn("am not in the flush dests, ignoring flush");
- return digest;
- }
-
- // block sending of messages (only if not already blocked !)
- if(!blocked) {
- blocked=true;
- synchronized(block_mutex) {
- passUp(new Event(Event.BLOCK));
- try {block_mutex.wait(block_timeout);}
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- // Just keep going, we've propagated the bit.
- }
- }
- }
-
- // asks NAKACK layer for unstable messages and saves result in 'digest'
- getMessageDigest(highest_seqnos);
- if(log.isInfoEnabled()) log.info(ExternalStrings.FLUSH_RETURNING_DIGEST___0, digest);
- return digest;
- }
-
-
-
-
-
-
- /** Returns the highest seqnos (for each member) seen so far (using the NAKACK layer) */
- void getHighestDeliveredSeqnos() {
- synchronized(highest_delivered_mutex) {
- passDown(new Event(Event.GET_MSGS_RECEIVED));
- try {
- highest_delivered_mutex.wait(4000);
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- // Just propagate to caller
-// if(log.isDebugEnabled()) log.debug("exception is " + e);
- }
- }
- }
-
-
-
-
-
- /** Interacts with a lower layer to retrieve unstable messages (e.g. NAKACK) */
- void getMessageDigest(long[] highest_seqnos) {
- synchronized(digest_mutex) {
- passDown(new Event(Event.GET_MSG_DIGEST, highest_seqnos));
- try {
- digest_mutex.wait(digest_timeout);
- }
- catch(InterruptedException e) {
- Thread.currentThread().interrupt(); // GemStoneAddition
- // just propagate to caller
- }
- }
- }
-
-
-
-
-
-
-
- /**
- <b>Callback</b>. Called by superclass when event may be handled.<p>
- <b>Do not use <code>PassUp</code> in this method as the event is passed up
- by default by the superclass after this method returns !</b>
- @return boolean Defaults to true. If false, event will not be passed up the stack.
- */
- @Override // GemStoneAddition
- public boolean handleUpEvent(Event evt) {
- switch(evt.getType()) {
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
-
- case Event.GET_MSG_DIGEST_OK:
- synchronized(digest_mutex) {
- digest=(Digest)evt.getArg();
- digest_mutex.notifyAll();
- }
- return false; // don't pass further up
-
- case Event.GET_MSGS_RECEIVED_OK:
- long[] tmp=(long[])evt.getArg();
- if(tmp != null)
- System.arraycopy(tmp, 0, highest_delivered_msgs, 0, tmp.length);
- synchronized(highest_delivered_mutex) {
- highest_delivered_mutex.notifyAll();
- }
- return false; // don't pass up any further !
-
- case Event.GET_MSGS_OK:
- synchronized(get_msgs_mutex) {
- get_msgs=(List)evt.getArg();
- get_msgs_mutex.notifyAll();
- }
- break;
-
- }
- return true;
- }
-
-
- /**
- <b>Callback</b>. Called by superclass when event may be handled.<p>
- <b>Do not use <code>PassDown</code> in this method as the event is passed down
- by default by the superclass after this method returns !</b>
- @return boolean Defaults to true. If false, event will not be passed down the stack.
- */
- @Override // GemStoneAddition
- public boolean handleDownEvent(Event evt) {
- Vector dests;
- FlushRsp rsp;
-
- switch(evt.getType()) {
- case Event.FLUSH:
- dests=(Vector)evt.getArg();
- if(dests == null) dests=new Vector();
- rsp=flush(dests);
- passUp(new Event(Event.FLUSH_OK, rsp));
- return false; // don't pass down
-
- case Event.BECOME_SERVER:
- is_server=true;
- break;
-
- case Event.VIEW_CHANGE:
- blocked=false;
-
- Vector tmp=((View)evt.getArg()).getMembers();
- if(tmp != null) {
- mbrs.removeAllElements();
- for(int i=0; i < tmp.size(); i++)
- mbrs.addElement(tmp.elementAt(i));
- }
- break;
- }
- return true;
- }
-
-
-
-
-
- /**
- The default handling adds the event to the down-queue where events are handled in order of
- addition by a thread. However, there exists a deadlock between the FLUSH and BLOCK_OK down
- events: when a FLUSH event is received, a BLOCK is sent up, which triggers a BLOCK_OK event
- to be sent down to be handled by the FLUSH layer. However, the FLUSH layer's thread is still
- processing the FLUSH down event and is therefore blocked, waiting for a BLOCK_OK event.
- Therefore, the BLOCK_OK event has to 'preempt' the FLUSH event processing. This is done by
- overriding this method: when a BLOCK_OK event is received, it is processed immediately
- (in parallel to the FLUSH event), which causes the FLUSH event processing to return.
- */
- @Override // GemStoneAddition
- public void receiveDownEvent(Event evt) {
- if(evt.getType() == Event.BLOCK_OK) { // priority handling, otherwise FLUSH would block !
- synchronized(down_queue) {
- Event event;
- try {
- while(down_queue.size() > 0) {
- event=(Event)down_queue.remove(10); // wait 10ms at most; queue is *not* empty !
- down(event);
- }
- }
- catch(Exception e) {}
- }
-
- synchronized(block_mutex) {
- block_mutex.notifyAll();
- }
- return;
- }
- super.receiveDownEvent(evt);
- }
-
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {super.setProperties(props);
- String str;
-
- str=props.getProperty("block_timeout");
- if(str != null) {
- block_timeout=Long.parseLong(str);
- props.remove("block_timeout");
- }
-
- str=props.getProperty("digest_timeout");
- if(str != null) {
- digest_timeout=Long.parseLong(str);
- props.remove("digest_timeout");
- }
-
- if(props.size() > 0) {
- log.error(ExternalStrings.FLUSH_EXAMPLESETPROPERTIES_THESE_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
-
-
-}
-