You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:50 UTC

[26/51] [partial] incubator-geode git commit: GEODE-77 removing the old jgroups subproject

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new
deleted file mode 100644
index efdcf56..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FD_SOCK.java.new
+++ /dev/null
@@ -1,1153 +0,0 @@
-// $Id: FD_SOCK.java,v 1.29 2005/08/11 12:43:47 belaban Exp $
-
-package org.jgroups.protocols;
-
-import org.jgroups.*;
-import org.jgroups.stack.IpAddress;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.*;
-
-import java.io.*;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.*;
-import java.util.List;
-
-
-/**
- * Failure detection protocol based on sockets. Failure detection is ring-based. Each member creates a
- * server socket and announces its address together with the server socket's address in a multicast. A
- * pinger thread will be started when the membership goes above 1 and will be stopped when it drops below
- * 2. The pinger thread connects to its neighbor on the right and waits until the socket is closed. When
- * the socket is closed by the monitored peer in an abnormal fashion (IOException), the neighbor will be
- * suspected.<p> The main feature of this protocol is that no ping messages need to be exchanged between
- * any 2 peers, and failure detection relies entirely on TCP sockets. The advantage is that no activity
- * will take place between 2 peers as long as they are alive (i.e. have their server sockets open).
- * The disadvantage is that hung servers or crashed routers will not cause sockets to be closed, therefore
- * they won't be detected.
- * The FD_SOCK protocol will work for groups where members are on different hosts<p>
- * The costs involved are 2 additional threads: one that
- * monitors the client side of the socket connection (to monitor a peer) and another one that manages the
- * server socket. However, those threads will be idle as long as both peers are running.
- * @author Bela Ban May 29 2001
- */
-public class FD_SOCK extends Protocol implements Runnable {
-    long                get_cache_timeout=3000;            // msecs to wait for the socket cache from the coordinator
-    final long          get_cache_retry_timeout=500;       // msecs to wait until we retry getting the cache from coord
-    long                suspect_msg_interval=5000;         // (BroadcastTask): mcast SUSPECT every 5000 msecs
-    int                 num_tries=3;                       // attempts coord is solicited for socket cache until we give up
-    final Vector        members=new Vector(11);            // list of group members (updated on VIEW_CHANGE)
-    boolean             srv_sock_sent=false;               // has own socket been broadcast yet ?
-    final Vector        pingable_mbrs=new Vector(11);      // mbrs from which we select ping_dest. may be subset of 'members'
-    final Promise       get_cache_promise=new Promise();   // used for rendezvous on GET_CACHE and GET_CACHE_RSP
-    boolean             got_cache_from_coord=false;        // was cache already fetched ?
-    Address             local_addr=null;                   // our own address
-    ServerSocket        srv_sock=null;                     // server socket to which another member connects to monitor me
-    InetAddress         srv_sock_bind_addr=null;           // the NIC on which the ServerSocket should listen
-    ServerSocketHandler srv_sock_handler=null;             // accepts new connections on srv_sock
-    IpAddress           srv_sock_addr=null;                // pair of server_socket:port
-    Address             ping_dest=null;                    // address of the member we monitor
-    Socket              ping_sock=null;                    // socket to the member we monitor
-    InputStream         ping_input=null;                   // input stream of the socket to the member we monitor
-    Thread              pinger_thread=null;                // listens on ping_sock, suspects member if socket is closed
-    final Hashtable     cache=new Hashtable(11);           // keys=Addresses, vals=IpAddresses (socket:port)
-    boolean             stopping=false;
-
-    /** Start port for server socket (uses first available port starting at start_port). A value of 0 (default)
-     * picks a random port */
-    int                 start_port=0;
-    final Promise       ping_addr_promise=new Promise();   // to fetch the ping_addr for ping_dest
-    final Object        sock_mutex=new Object();           // for access to ping_sock, ping_input
-    TimeScheduler       timer=null;
-    final BroadcastTask bcast_task=new BroadcastTask();    // to transmit SUSPECT message (until view change)
-    boolean             regular_sock_close=false;         // used by interruptPingerThread() when new ping_dest is computed
-    int                 num_suspect_events=0;
-    private static final int NORMAL_TEMINATION=9;
-    private static final int ABNORMAL_TEMINATION=-1;
-    private static final String name="FD_SOCK";
-
-    BoundedList          suspect_history=new BoundedList(20);
-
-
-    public String getName() {
-        return name;
-    }
-
-    public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
-    public String getMembers() {return members != null? members.toString() : "null";}
-    public String getPingableMembers() {return pingable_mbrs != null? pingable_mbrs.toString() : "null";}
-    public String getPingDest() {return ping_dest != null? ping_dest.toString() : "null";}
-    public int getNumSuspectEventsGenerated() {return num_suspect_events;}
-    public String printSuspectHistory() {
-        StringBuffer sb=new StringBuffer();
-        for(Enumeration en=suspect_history.elements(); en.hasMoreElements();) {
-            sb.append(new Date()).append(": ").append(en.nextElement()).append("\n");
-        }
-        return sb.toString();
-    }
-
-    public boolean setProperties(Properties props) {
-        String str, tmp=null;
-
-        super.setProperties(props);
-        str=props.getProperty("get_cache_timeout");
-        if(str != null) {
-            get_cache_timeout=Long.parseLong(str);
-            props.remove("get_cache_timeout");
-        }
-
-        str=props.getProperty("suspect_msg_interval");
-        if(str != null) {
-            suspect_msg_interval=Long.parseLong(str);
-            props.remove("suspect_msg_interval");
-        }
-
-        str=props.getProperty("num_tries");
-        if(str != null) {
-            num_tries=Integer.parseInt(str);
-            props.remove("num_tries");
-        }
-
-        str=props.getProperty("start_port");
-        if(str != null) {
-            start_port=Integer.parseInt(str);
-            props.remove("start_port");
-        }
-
-
-        // PropertyPermission not granted if running in an untrusted environment with JNLP.
-        try {tmp=System.getProperty("bind.address");} catch (SecurityException ex){}
-        if(tmp != null)
-            str=tmp;
-        else
-            str=props.getProperty("srv_sock_bind_addr");
-        if(str != null) {
-            try {
-                srv_sock_bind_addr=InetAddress.getByName(str);
-            }
-            catch(UnknownHostException e) {
-                log.error("srv_sock_bind_addr " + str + " is invalid", e);
-                return false;
-            }
-            props.remove("srv_sock_bind_addr");
-        }
-
-        if(props.size() > 0) {
-            log.error("FD_SOCK.setProperties(): the following properties are not recognized: " + props);
-
-            return false;
-        }
-        return true;
-    }
-
-
-    public void init() throws Exception {
-        stopping=false;
-        srv_sock_handler=new ServerSocketHandler();
-        timer=stack != null ? stack.timer : null;
-        if(timer == null)
-            throw new Exception("FD_SOCK.init(): timer == null");
-    }
-
-    public void start() throws Exception {
-        super.start();
-        stopping=false;
-    }
-
-
-    public void stop() {
-        stopping=true;
-        bcast_task.removeAll();
-        stopPingerThread();
-        stopServerSocket();
-    }
-
-    public void resetStats() {
-        super.resetStats();
-        num_suspect_events=0;
-        suspect_history.removeAll();
-    }
-
-
-    public void up(Event evt) {
-        Message msg;
-        FdHeader hdr;
-
-        switch(evt.getType()) {
-
-        case Event.SET_LOCAL_ADDRESS:
-            local_addr=(Address) evt.getArg();
-            break;
-
-        case Event.MSG:
-            msg=(Message) evt.getArg();
-            hdr=(FdHeader) msg.removeHeader(name);
-            if(hdr == null)
-                break;  // message did not originate from FD_SOCK layer, just pass up
-
-            switch(hdr.type) {
-
-            case FdHeader.SUSPECT:
-                if(hdr.mbrs != null) {
-                    if(log.isDebugEnabled()) log.debug("[SUSPECT] hdr=" + hdr);
-                    for(int i=0; i < hdr.mbrs.size(); i++) {
-                        passUp(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
-                        passDown(new Event(Event.SUSPECT, hdr.mbrs.elementAt(i)));
-                    }
-                }
-                else
-                    if(warn) log.warn("[SUSPECT]: hdr.mbrs == null");
-                break;
-
-                // If I have the sock for 'hdr.mbr', return it. Otherwise look it up in my cache and return it
-            case FdHeader.WHO_HAS_SOCK:
-                if(local_addr != null && local_addr.equals(msg.getSrc()))
-                    return; // don't reply to WHO_HAS bcasts sent by me !
-
-                if(hdr.mbr == null) {
-                    if(log.isErrorEnabled()) log.error("hdr.mbr is null");
-                    return;
-                }
-
-                if(trace) log.trace("who-has-sock " + hdr.mbr);
-
-                // 1. Try my own address, maybe it's me whose socket is wanted
-                if(local_addr != null && local_addr.equals(hdr.mbr) && srv_sock_addr != null) {
-                    sendIHaveSockMessage(msg.getSrc(), local_addr, srv_sock_addr);  // unicast message to msg.getSrc()
-                    return;
-                }
-
-                // 2. If I don't have it, maybe it is in the cache
-                if(cache.containsKey(hdr.mbr))
-                    sendIHaveSockMessage(msg.getSrc(), hdr.mbr, (IpAddress) cache.get(hdr.mbr));  // ucast msg
-                break;
-
-
-                // Update the cache with the addr:sock_addr entry (if on the same host)
-            case FdHeader.I_HAVE_SOCK:
-                if(hdr.mbr == null || hdr.sock_addr == null) {
-                    if(log.isErrorEnabled()) log.error("[I_HAVE_SOCK]: hdr.mbr is null or hdr.sock_addr == null");
-                    return;
-                }
-
-                // if(!cache.containsKey(hdr.mbr))
-                cache.put(hdr.mbr, hdr.sock_addr); // update the cache
-                if(trace) log.trace("i-have-sock: " + hdr.mbr + " --> " +
-                                                   hdr.sock_addr + " (cache is " + cache + ')');
-
-                if(ping_dest != null && hdr.mbr.equals(ping_dest))
-                    ping_addr_promise.setResult(hdr.sock_addr);
-                break;
-
-                // Return the cache to the sender of this message
-            case FdHeader.GET_CACHE:
-                if(hdr.mbr == null) {
-                    if(log.isErrorEnabled()) log.error("(GET_CACHE): hdr.mbr == null");
-                    return;
-                }
-                hdr=new FdHeader(FdHeader.GET_CACHE_RSP);
-                hdr.cachedAddrs=(Hashtable) cache.clone();
-                msg=new Message(hdr.mbr, null, null);
-                msg.putHeader(name, hdr);
-                passDown(new Event(Event.MSG, msg));
-                break;
-
-            case FdHeader.GET_CACHE_RSP:
-                if(hdr.cachedAddrs == null) {
-                    if(log.isErrorEnabled()) log.error("(GET_CACHE_RSP): cache is null");
-                    return;
-                }
-                get_cache_promise.setResult(hdr.cachedAddrs);
-                break;
-            }
-            return;
-        }
-
-        passUp(evt);                                        // pass up to the layer above us
-    }
-
-
-    public void down(Event evt) {
-        Address mbr, tmp_ping_dest;
-        View v;
-
-        switch(evt.getType()) {
-
-            case Event.UNSUSPECT:
-                bcast_task.removeSuspectedMember((Address)evt.getArg());
-                break;
-
-            case Event.CONNECT:
-                passDown(evt);
-                srv_sock=Util.createServerSocket(srv_sock_bind_addr, start_port); // grab a random unused port above 10000
-                srv_sock_addr=new IpAddress(srv_sock_bind_addr, srv_sock.getLocalPort());
-                startServerSocket();
-                break;
-
-
-            case Event.VIEW_CHANGE:
-                synchronized(this) {
-                    v=(View) evt.getArg();
-                    members.removeAllElements();
-                    members.addAll(v.getMembers());
-                    bcast_task.adjustSuspectedMembers(members);
-                    pingable_mbrs.removeAllElements();
-                    pingable_mbrs.addAll(members);
-                    passDown(evt);
-                    if(log.isDebugEnabled()) log.debug("VIEW_CHANGE received: " + members);
-                    System.out.println("***** NEW VIEW = " + v + " (local=" + local_addr + ")");
-
-                    // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
-//                    if(!got_cache_from_coord) {
-//                        getCacheFromCoordinator();
-//                        got_cache_from_coord=true;
-//                    }
-
-
-                    // 2. Broadcast my own addr:sock to all members so they can update their cache
-                    if(!srv_sock_sent) {
-                        if(srv_sock_addr != null) {
-                            sendIHaveSockMessage(null, // send to all members
-                                    local_addr,
-                                    srv_sock_addr);
-                            srv_sock_sent=true;
-                        }
-                        else
-                            if(warn) log.warn("(VIEW_CHANGE): srv_sock_addr == null");
-                    }
-
-                    // 3. Remove all entries in 'cache' which are not in the new membership
-                    for(Enumeration e=cache.keys(); e.hasMoreElements();) {
-                        mbr=(Address) e.nextElement();
-                        if(!members.contains(mbr))
-                            cache.remove(mbr);
-                    }
-
-                    if(members.size() > 1) {
-                        if(pinger_thread != null && pinger_thread.isAlive()) {
-                            tmp_ping_dest=determinePingDest();
-                            if(ping_dest != null && tmp_ping_dest != null && !ping_dest.equals(tmp_ping_dest)) {
-                                interruptPingerThread(); // allows the thread to use the new socket
-                            }
-                        }
-                        else
-                            startPingerThread(); // only starts if not yet running
-                    }
-                    else {
-                        ping_dest=null;
-                        stopPingerThread();
-                    }
-                }
-                break;
-
-            default:
-                passDown(evt);
-                break;
-        }
-    }
-
-
-    /**
-     * Runs as long as there are 2 members and more. Determines the member to be monitored and fetches its
-     * server socket address (if n/a, sends a message to obtain it). The creates a client socket and listens on
-     * it until the connection breaks. If it breaks, emits a SUSPECT message. It the connection is closed regularly,
-     * nothing happens. In both cases, a new member to be monitored will be chosen and monitoring continues (unless
-     * there are fewer than 2 members).
-     */
-    public void run() {
-        Address tmp_ping_dest;
-        IpAddress ping_addr;
-        int max_fetch_tries=10;  // number of times a socket address is to be requested before giving up
-
-        if(trace) log.trace("pinger_thread started"); // +++ remove
-
-        // 1. Get the addr:pid cache from the coordinator (only if not already fetched)
-        if(!got_cache_from_coord) {
-            getCacheFromCoordinator();
-            got_cache_from_coord=true;
-        }
-
-        while(pinger_thread != null && Thread.currentThread().equals(pinger_thread) && !stopping) {
-            tmp_ping_dest=determinePingDest(); // gets the neighbor to our right
-            if(log.isDebugEnabled())
-                log.debug("determinePingDest()=" + tmp_ping_dest + ", pingable_mbrs=" + pingable_mbrs);
-            if(tmp_ping_dest == null) {
-                ping_dest=null;
-                pinger_thread=null;
-                break;
-            }
-            ping_dest=tmp_ping_dest;
-            ping_addr=fetchPingAddress(ping_dest);
-            if(ping_addr == null) {
-                if(log.isErrorEnabled()) log.error("socket address for " + ping_dest + " could not be fetched, retrying");
-                if(--max_fetch_tries <= 0)
-                    break;
-                if(!stopping)
-                    Util.sleep(2000);
-                continue;
-            }
-
-            if(!setupPingSocket(ping_addr)) {
-                // covers use cases #7 and #8 in GmsTests.txt
-                if(log.isDebugEnabled()) log.debug("could not create socket to " + ping_dest + "; suspecting " + ping_dest);
-                broadcastSuspectMessage(ping_dest);
-                pingable_mbrs.removeElement(ping_dest);
-                continue;
-            }
-
-            if(log.isTraceEnabled())
-                log.trace("ping_dest=" + ping_dest + ", ping_sock=" + ping_sock + ", cache=" + cache);
-
-
-            System.out.println("#### PING_DEST: " + ping_dest + " (self=" + local_addr + ")");
-
-            // at this point ping_input must be non-null, otherwise setupPingSocket() would have thrown an exception
-            try {
-                if(ping_input != null) {
-                    int c=ping_input.read();
-                    switch(c) {
-                        case NORMAL_TEMINATION:
-                            if(log.isDebugEnabled())
-                                log.debug("peer closed socket normally");
-                            pinger_thread=null;
-                            break;
-                        case ABNORMAL_TEMINATION:
-                            handleSocketClose(null);
-                            break;
-                        default:
-                            break;
-                    }
-                }
-            }
-            catch(IOException ex) {  // we got here when the peer closed the socket --> suspect peer and then continue
-                handleSocketClose(ex);
-            }
-            catch(Throwable catch_all_the_rest) {
-                log.error("exception", catch_all_the_rest);
-            }
-        }
-        if(log.isTraceEnabled()) log.trace("pinger thread terminated");
-        pinger_thread=null;
-    }
-
-
-
-
-    /* ----------------------------------- Private Methods -------------------------------------- */
-
-
-    void handleSocketClose(Exception ex) {
-        teardownPingSocket();     // make sure we have no leftovers
-        if(!regular_sock_close) { // only suspect if socket was not closed regularly (by interruptPingerThread())
-            if(log.isDebugEnabled())
-                log.debug("peer " + ping_dest + " closed socket (" + (ex != null ? ex.getClass().getName() : "eof") + ')');
-            broadcastSuspectMessage(ping_dest);
-            pingable_mbrs.removeElement(ping_dest);
-        }
-        else {
-            if(log.isDebugEnabled()) log.debug("socket to " + ping_dest + " was reset");
-            regular_sock_close=false;
-        }
-    }
-
-
-    void startPingerThread() {
-        if(pinger_thread == null || !pinger_thread.isAlive()) {
-            System.out.println("**** starting pinger thread");
-            pinger_thread=new Thread(this, "FD_SOCK Ping thread");
-            pinger_thread.setDaemon(true);
-            pinger_thread.start();
-        }
-    }
-
-
-    void stopPingerThread() {
-        stopping=true;
-        if(pinger_thread != null && pinger_thread.isAlive()) {
-            System.out.println("**** stopping pinger thread");
-            pinger_thread=null;
-            regular_sock_close=true;
-            teardownPingSocket();
-            ping_addr_promise.reset();
-            get_cache_promise.reset();
-        }
-        pinger_thread=null;
-    }
-
-
-    /**
-     * Interrupts the pinger thread. The Thread.interrupt() method doesn't seem to work under Linux with JDK 1.3.1
-     * (JDK 1.2.2 had no problems here), therefore we close the socket (setSoLinger has to be set !) if we are
-     * running under Linux. This should be tested under Windows. (Solaris 8 and JDK 1.3.1 definitely works).<p>
-     * Oct 29 2001 (bela): completely removed Thread.interrupt(), but used socket close on all OSs. This makes this
-     * code portable and we don't have to check for OSs.
-     * @see org.jgroups.tests.InterruptTest to determine whether Thread.interrupt() works for InputStream.read().
-     */
-    void interruptPingerThread() {
-        if(pinger_thread != null && pinger_thread.isAlive()) {
-            regular_sock_close=true;
-            teardownPingSocket(); // will wake up the pinger thread. less elegant than Thread.interrupt(), but does the job
-        }
-    }
-
-    void startServerSocket() {
-        if(srv_sock_handler != null)
-            srv_sock_handler.start(); // won't start if already running
-    }
-
-    void stopServerSocket() {
-        if(srv_sock_handler != null)
-            srv_sock_handler.stop();
-    }
-
-
-    /**
-     * Creates a socket to <code>dest</code>, and assigns it to ping_sock. Also assigns ping_input
-     */
-    boolean setupPingSocket(IpAddress dest) {
-        synchronized(sock_mutex) {
-            if(dest == null) {
-                if(log.isErrorEnabled()) log.error("destination address is null");
-                return false;
-            }
-            try {
-                ping_sock=new Socket(dest.getIpAddress(), dest.getPort());
-                ping_sock.setSoLinger(true, 1);
-                ping_input=ping_sock.getInputStream();
-                return true;
-            }
-            catch(Throwable ex) {
-                return false;
-            }
-        }
-    }
-
-
-    void teardownPingSocket() {
-        synchronized(sock_mutex) {
-            if(ping_sock != null) {
-                try {
-                    ping_sock.shutdownInput();
-                    ping_sock.close();
-                }
-                catch(Exception ex) {
-                }
-                ping_sock=null;
-            }
-            if(ping_input != null) {
-                try {
-                    ping_input.close();
-                }
-                catch(Exception ex) {
-                }
-                ping_input=null;
-            }
-        }
-    }
-
-
-    /**
-     * Determines coordinator C. If C is null and we are the first member, return. Else loop: send GET_CACHE message
-     * to coordinator and wait for GET_CACHE_RSP response. Loop until valid response has been received.
-     */
-    private void getCacheFromCoordinator() {
-        Address coord;
-        int attempts=num_tries;
-        Message msg;
-        FdHeader hdr;
-        Hashtable result;
-
-        get_cache_promise.reset();
-        while(attempts > 0 && !stopping) {
-            if((coord=determineCoordinator()) != null) {
-                if(coord.equals(local_addr)) { // we are the first member --> empty cache
-                    if(log.isDebugEnabled()) log.debug("first member; cache is empty");
-                    return;
-                }
-                hdr=new FdHeader(FdHeader.GET_CACHE);
-                hdr.mbr=local_addr;
-                msg=new Message(coord, null, null);
-                msg.putHeader(name, hdr);
-                passDown(new Event(Event.MSG, msg));
-                result=(Hashtable) get_cache_promise.getResult(get_cache_timeout);
-                if(result != null) {
-                    cache.putAll(result); // replace all entries (there should be none !) in cache with the new values
-                    if(trace) log.trace("got cache from " + coord + ": cache is " + cache);
-                    return;
-                }
-                else {
-                    if(log.isErrorEnabled())
-                        log.error("received null cache; retrying (stopping=" + stopping + "), " +
-                                "members=" + members + ", pingable_members=" + pingable_mbrs);
-                }
-            }
-
-            Util.sleep(get_cache_retry_timeout);
-            --attempts;
-        }
-    }
-
-
-    /**
-     * Sends a SUSPECT message to all group members. Only the coordinator (or the next member in line if the coord
-     * itself is suspected) will react to this message by installing a new view. To overcome the unreliability
-     * of the SUSPECT message (it may be lost because we are not above any retransmission layer), the following scheme
-     * is used: after sending the SUSPECT message, it is also added to the broadcast task, which will periodically
-     * re-send the SUSPECT until a view is received in which the suspected process is not a member anymore. The reason is
-     * that - at one point - either the coordinator or another participant taking over for a crashed coordinator, will
-     * react to the SUSPECT message and issue a new view, at which point the broadcast task stops.
-     */
-    void broadcastSuspectMessage(Address suspected_mbr) {
-        Message suspect_msg;
-        FdHeader hdr;
-
-        if(suspected_mbr == null) return;
-
-        if(trace) log.trace("suspecting " + suspected_mbr + " (own address is " + local_addr + ')');
-
-        // 1. Send a SUSPECT message right away; the broadcast task will take some time to send it (sleeps first)
-        hdr=new FdHeader(FdHeader.SUSPECT);
-        hdr.mbrs=new Vector(1);
-        hdr.mbrs.addElement(suspected_mbr);
-        suspect_msg=new Message();
-        suspect_msg.putHeader(name, hdr);
-        passDown(new Event(Event.MSG, suspect_msg));
-
-        // 2. Add to broadcast task and start latter (if not yet running). The task will end when
-        //    suspected members are removed from the membership
-        bcast_task.addSuspectedMember(suspected_mbr);
-        if(stats) {
-            num_suspect_events++;
-            suspect_history.add(suspected_mbr);
-        }
-    }
-
-
-    void broadcastWhoHasSockMessage(Address mbr) {
-        Message msg;
-        FdHeader hdr;
-
-        if(local_addr != null && mbr != null)
-            if(log.isDebugEnabled()) log.debug("[" + local_addr + "]: who-has " + mbr);
-
-        msg=new Message();  // bcast msg
-        hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
-        hdr.mbr=mbr;
-        msg.putHeader(name, hdr);
-        passDown(new Event(Event.MSG, msg));
-    }
-
-
-    /**
-     Sends or broadcasts a I_HAVE_SOCK response. If 'dst' is null, the reponse will be broadcast, otherwise
-     it will be unicast back to the requester
-     */
-    void sendIHaveSockMessage(Address dst, Address mbr, IpAddress addr) {
-        Message msg=new Message(dst, null, null);
-        FdHeader hdr=new FdHeader(FdHeader.I_HAVE_SOCK);
-        hdr.mbr=mbr;
-        hdr.sock_addr=addr;
-        msg.putHeader(name, hdr);
-
-        if(trace) // +++ remove
-            log.trace("hdr=" + hdr);
-
-        passDown(new Event(Event.MSG, msg));
-    }
-
-
-    /**
-     Attempts to obtain the ping_addr first from the cache, then by unicasting q request to <code>mbr</code>,
-     then by multicasting a request to all members.
-     */
-    private IpAddress fetchPingAddress(Address mbr) {
-        IpAddress ret;
-        Message ping_addr_req;
-        FdHeader hdr;
-
-        if(mbr == null) {
-            if(log.isErrorEnabled()) log.error("mbr == null");
-            return null;
-        }
-        // 1. Try to get from cache. Add a little delay so that joining mbrs can send their socket address before
-        //    we ask them to do so
-        ret=(IpAddress)cache.get(mbr);
-        if(ret != null)
-            return ret;
-
-        Util.sleep(300);
-        if((ret=(IpAddress)cache.get(mbr)) != null)
-            return ret;
-
-        // 2. Try to get from mbr
-        if(stopping)
-            return null;
-        ping_addr_promise.reset();
-        ping_addr_req=new Message(mbr, null, null); // unicast
-        hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
-        hdr.mbr=mbr;
-        ping_addr_req.putHeader(name, hdr);
-        passDown(new Event(Event.MSG, ping_addr_req));
-        ret=(IpAddress) ping_addr_promise.getResult(3000);
-        if(ret != null)
-            return ret;
-
-        // 3. Try to get from all members
-        if(stopping)
-            return null;
-        ping_addr_req=new Message(null, null, null); // multicast
-        hdr=new FdHeader(FdHeader.WHO_HAS_SOCK);
-        hdr.mbr=mbr;
-        ping_addr_req.putHeader(name, hdr);
-        passDown(new Event(Event.MSG, ping_addr_req));
-        ret=(IpAddress) ping_addr_promise.getResult(3000);
-        return ret;
-    }
-
-
-    Address determinePingDest() {
-        Address tmp;
-
-        if(pingable_mbrs == null || pingable_mbrs.size() < 2 || local_addr == null)
-            return null;
-        for(int i=0; i < pingable_mbrs.size(); i++) {
-            tmp=(Address) pingable_mbrs.elementAt(i);
-            if(local_addr.equals(tmp)) {
-                if(i + 1 >= pingable_mbrs.size())
-                    return (Address) pingable_mbrs.elementAt(0);
-                else
-                    return (Address) pingable_mbrs.elementAt(i + 1);
-            }
-        }
-        return null;
-    }
-
-
-    Address determineCoordinator() {
-        return members.size() > 0 ? (Address) members.elementAt(0) : null;
-    }
-
-
-
-
-
-    /* ------------------------------- End of Private Methods ------------------------------------ */
-
-
-    public static class FdHeader extends Header implements Streamable {
-        public static final byte SUSPECT=10;
-        public static final byte WHO_HAS_SOCK=11;
-        public static final byte I_HAVE_SOCK=12;
-        public static final byte GET_CACHE=13; // sent by joining member to coordinator
-        public static final byte GET_CACHE_RSP=14; // sent by coordinator to joining member in response to GET_CACHE
-
-
-        byte      type=SUSPECT;
-        Address   mbr=null;           // set on WHO_HAS_SOCK (requested mbr), I_HAVE_SOCK
-        IpAddress sock_addr;          // set on I_HAVE_SOCK
-
-        // Hashtable<Address,IpAddress>
-        Hashtable cachedAddrs=null;   // set on GET_CACHE_RSP
-        Vector    mbrs=null;          // set on SUSPECT (list of suspected members)
-
-
-        public FdHeader() {
-        } // used for externalization
-
-        public FdHeader(byte type) {
-            this.type=type;
-        }
-
-        public FdHeader(byte type, Address mbr) {
-            this.type=type;
-            this.mbr=mbr;
-        }
-
-        public FdHeader(byte type, Vector mbrs) {
-            this.type=type;
-            this.mbrs=mbrs;
-        }
-
-        public FdHeader(byte type, Hashtable cachedAddrs) {
-            this.type=type;
-            this.cachedAddrs=cachedAddrs;
-        }
-
-
-        public String toString() {
-            StringBuffer sb=new StringBuffer();
-            sb.append(type2String(type));
-            if(mbr != null)
-                sb.append(", mbr=").append(mbr);
-            if(sock_addr != null)
-                sb.append(", sock_addr=").append(sock_addr);
-            if(cachedAddrs != null)
-                sb.append(", cache=").append(cachedAddrs);
-            if(mbrs != null)
-                sb.append(", mbrs=").append(mbrs);
-            return sb.toString();
-        }
-
-
-        public static String type2String(byte type) {
-            switch(type) {
-                case SUSPECT:
-                    return "SUSPECT";
-                case WHO_HAS_SOCK:
-                    return "WHO_HAS_SOCK";
-                case I_HAVE_SOCK:
-                    return "I_HAVE_SOCK";
-                case GET_CACHE:
-                    return "GET_CACHE";
-                case GET_CACHE_RSP:
-                    return "GET_CACHE_RSP";
-                default:
-                    return "unknown type (" + type + ')';
-            }
-        }
-
-        public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeByte(type);
-            out.writeObject(mbr);
-            out.writeObject(sock_addr);
-            out.writeObject(cachedAddrs);
-            out.writeObject(mbrs);
-        }
-
-
-        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            type=in.readByte();
-            mbr=(Address) in.readObject();
-            sock_addr=(IpAddress) in.readObject();
-            cachedAddrs=(Hashtable) in.readObject();
-            mbrs=(Vector) in.readObject();
-        }
-
-        public long size() {
-            long retval=Global.BYTE_SIZE; // type
-            retval+=Util.size(mbr);
-            retval+=Util.size(sock_addr);
-
-            retval+=Global.INT_SIZE; // cachedAddrs size
-            Map.Entry entry;
-            Address key;
-            IpAddress val;
-            if(cachedAddrs != null) {
-                for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
-                    entry=(Map.Entry)it.next();
-                    if((key=(Address)entry.getKey()) != null)
-                        retval+=Util.size(key);
-                    retval+=Global.BYTE_SIZE; // presence for val
-                    if((val=(IpAddress)entry.getValue()) != null)
-                        retval+=val.size();
-                }
-            }
-
-            retval+=Global.INT_SIZE; // mbrs size
-            if(mbrs != null) {
-                for(int i=0; i < mbrs.size(); i++) {
-                    retval+=Util.size((Address)mbrs.elementAt(i));
-                }
-            }
-
-            return retval;
-        }
-
-        public void writeTo(DataOutputStream out) throws IOException {
-            int size;
-            out.writeByte(type);
-            Util.writeAddress(mbr, out);
-            Util.writeStreamable(sock_addr, out);
-            size=cachedAddrs != null? cachedAddrs.size() : 0;
-            out.writeInt(size);
-            if(size > 0) {
-                for(Iterator it=cachedAddrs.entrySet().iterator(); it.hasNext();) {
-                    Map.Entry entry=(Map.Entry)it.next();
-                    Address key=(Address)entry.getKey();
-                    IpAddress val=(IpAddress)entry.getValue();
-                    Util.writeAddress(key, out);
-                    Util.writeStreamable(val, out);
-                }
-            }
-            size=mbrs != null? mbrs.size() : 0;
-            out.writeInt(size);
-            if(size > 0) {
-                for(Iterator it=mbrs.iterator(); it.hasNext();) {
-                    Address address=(Address)it.next();
-                    Util.writeAddress(address, out);
-                }
-            }
-        }
-
-        public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
-            int size;
-            type=in.readByte();
-            mbr=Util.readAddress(in);
-            sock_addr=(IpAddress)Util.readStreamable(IpAddress.class, in);
-            size=in.readInt();
-            if(size > 0) {
-                if(cachedAddrs == null)
-                    cachedAddrs=new Hashtable();
-                for(int i=0; i < size; i++) {
-                    Address key=Util.readAddress(in);
-                    IpAddress val=(IpAddress)Util.readStreamable(IpAddress.class, in);
-                    cachedAddrs.put(key, val);
-                }
-            }
-            size=in.readInt();
-            if(size > 0) {
-                if(mbrs == null)
-                    mbrs=new Vector();
-                for(int i=0; i < size; i++) {
-                    Address addr=Util.readAddress(in);
-                    mbrs.add(addr);
-                }
-            }
-        }
-
-    }
-
-
-    /**
-     * Handles the server-side of a client-server socket connection. Waits until a client connects, and then loops
-     * until that client closes the connection. Note that there is no new thread spawned for the listening on the
-     * client socket, therefore there can only be 1 client connection at the same time. Subsequent clients attempting
-     * to create a connection will be blocked until the first client closes its connection. This should not be a problem
-     * as the ring nature of the FD_SOCK protocol always has only 1 client connect to its right-hand-side neighbor.
-     */
-    private class ServerSocketHandler implements Runnable {
-        Thread acceptor=null;
-        /** List<ClientConnectionHandler> */
-        final List clients=new ArrayList();
-
-
-
-        ServerSocketHandler() {
-            start();
-        }
-
-        void start() {
-            if(acceptor == null) {
-                acceptor=new Thread(this, "ServerSocket acceptor thread");
-                acceptor.setDaemon(true);
-                acceptor.start();
-            }
-        }
-
-
-        void stop() {
-            if(acceptor != null && acceptor.isAlive()) {
-                try {
-                    srv_sock.close(); // this will terminate thread, peer will receive SocketException (socket close)
-                }
-                catch(Exception ex) {
-                }
-            }
-            synchronized(clients) {
-                for(Iterator it=clients.iterator(); it.hasNext();) {
-                    ClientConnectionHandler handler=(ClientConnectionHandler)it.next();
-                    handler.stopThread();
-                }
-                clients.clear();
-            }
-            acceptor=null;
-        }
-
-
-        /** Only accepts 1 client connection at a time (saving threads) */
-        public void run() {
-            Socket client_sock;
-            while(acceptor != null && srv_sock != null) {
-                try {
-                    if(trace) // +++ remove
-                        log.trace("waiting for client connections on " + srv_sock.getInetAddress() + ":" +
-                                  srv_sock.getLocalPort());
-                    client_sock=srv_sock.accept();
-                    if(trace) // +++ remove
-                        log.trace("accepted connection from " + client_sock.getInetAddress() + ':' + client_sock.getPort());
-                    ClientConnectionHandler client_conn_handler=new ClientConnectionHandler(client_sock, clients);
-                    synchronized(clients) {
-                        clients.add(client_conn_handler);
-                    }
-                    client_conn_handler.start();
-                }
-                catch(IOException io_ex2) {
-                    break;
-                }
-            }
-            acceptor=null;
-        }
-    }
-
-
-
-    /** Handles a client connection; multiple client can connect at the same time */
-    private static class ClientConnectionHandler extends Thread {
-        Socket      client_sock=null;
-        InputStream in;
-        final Object mutex=new Object();
-        List clients=new ArrayList();
-
-        ClientConnectionHandler(Socket client_sock, List clients) {
-            setName("ClientConnectionHandler");
-            setDaemon(true);
-            this.client_sock=client_sock;
-            this.clients.addAll(clients);
-        }
-
-        void stopThread() {
-            synchronized(mutex) {
-                if(client_sock != null) {
-                    try {
-                        OutputStream out=client_sock.getOutputStream();
-                        out.write(NORMAL_TEMINATION);
-                    }
-                    catch(Throwable t) {
-                    }
-                }
-            }
-            closeClientSocket();
-        }
-
-        void closeClientSocket() {
-            synchronized(mutex) {
-                if(client_sock != null) {
-                    try {
-                        client_sock.close();
-                    }
-                    catch(Exception ex) {
-                    }
-                    client_sock=null;
-                }
-            }
-        }
-
-        public void run() {
-            try {
-                synchronized(mutex) {
-                    if(client_sock == null)
-                        return;
-                    in=client_sock.getInputStream();
-                }
-                while((in.read()) != -1) {
-                }
-            }
-            catch(IOException io_ex1) {
-            }
-            finally {
-                closeClientSocket();
-                synchronized(clients) {
-                    clients.remove(this);
-                }
-            }
-        }
-    }
-
-
-    /**
-     * Task that periodically broadcasts a list of suspected members to the group. Goal is not to lose
-     * a SUSPECT message: since these are bcast unreliably, they might get dropped. The BroadcastTask makes
-     * sure they are retransmitted until a view has been received which doesn't contain the suspected members
-     * any longer. Then the task terminates.
-     */
-    private class BroadcastTask implements TimeScheduler.Task {
-        final Vector suspected_mbrs=new Vector(7);
-        boolean stopped=false;
-
-
-        /** Adds a suspected member. Starts the task if not yet running */
-        public void addSuspectedMember(Address mbr) {
-            if(mbr == null) return;
-            if(!members.contains(mbr)) return;
-            synchronized(suspected_mbrs) {
-                if(!suspected_mbrs.contains(mbr)) {
-                    suspected_mbrs.addElement(mbr);
-                    if(log.isDebugEnabled()) log.debug("mbr=" + mbr + " (size=" + suspected_mbrs.size() + ')');
-                }
-                if(stopped && suspected_mbrs.size() > 0) {
-                    stopped=false;
-                    timer.add(this, true);
-                }
-            }
-        }
-
-
-        public void removeSuspectedMember(Address suspected_mbr) {
-            if(suspected_mbr == null) return;
-            if(log.isDebugEnabled()) log.debug("member is " + suspected_mbr);
-            synchronized(suspected_mbrs) {
-                suspected_mbrs.removeElement(suspected_mbr);
-                if(suspected_mbrs.size() == 0)
-                    stopped=true;
-            }
-        }
-
-
-        public void removeAll() {
-            synchronized(suspected_mbrs) {
-                suspected_mbrs.removeAllElements();
-                stopped=true;
-            }
-        }
-
-
-        /**
-         * Removes all elements from suspected_mbrs that are <em>not</em> in the new membership
-         */
-        public void adjustSuspectedMembers(Vector new_mbrship) {
-            Address suspected_mbr;
-
-            if(new_mbrship == null || new_mbrship.size() == 0) return;
-            synchronized(suspected_mbrs) {
-                for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
-                    suspected_mbr=(Address) it.next();
-                    if(!new_mbrship.contains(suspected_mbr)) {
-                        it.remove();
-                        if(log.isDebugEnabled())
-                            log.debug("removed " + suspected_mbr + " (size=" + suspected_mbrs.size() + ')');
-                    }
-                }
-                if(suspected_mbrs.size() == 0)
-                    stopped=true;
-            }
-        }
-
-
-        public boolean cancelled() {
-            return stopped;
-        }
-
-
-        public long nextInterval() {
-            return suspect_msg_interval;
-        }
-
-
-        public void run() {
-            Message suspect_msg;
-            FdHeader hdr;
-
-            if(log.isDebugEnabled())
-                log.debug("broadcasting SUSPECT message (suspected_mbrs=" + suspected_mbrs + ") to group");
-
-            synchronized(suspected_mbrs) {
-                if(suspected_mbrs.size() == 0) {
-                    stopped=true;
-                    if(log.isDebugEnabled()) log.debug("task done (no suspected members)");
-                    return;
-                }
-
-                hdr=new FdHeader(FdHeader.SUSPECT);
-                hdr.mbrs=(Vector) suspected_mbrs.clone();
-            }
-            suspect_msg=new Message();       // mcast SUSPECT to all members
-            suspect_msg.putHeader(name, hdr);
-            passDown(new Event(Event.MSG, suspect_msg));
-            if(log.isDebugEnabled()) log.debug("task done");
-        }
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java
deleted file mode 100644
index 304d8ab..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOWCONTROL.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.stack.Protocol;
-
-import java.util.Vector;
-
-
-
-
-/**
- * Title: Flow control layer
- * Description: This layer limits the number of sent messages without a receive of an own message to MAXSENTMSGS,
- * just put this layer above GMS and you will get a more
- * Copyright:    Copyright (c) 2000
- * Company:      Computer Network Laboratory
- * @author Gianluca Collot
- * @version 1.0
- */
-public class FLOWCONTROL extends Protocol {
-
-  final Vector queuedMsgs = new Vector();
-  int sentMsgs = 0;
-  static final int MAXSENTMSGS = 1;
-  Address myAddr;
-
-
-    public FLOWCONTROL() {
-    }
-    @Override // GemStoneAddition  
-    public String getName() {
-	return "FLOWCONTROL";
-    }
-
-  /**
-   * Checs if up messages are from myaddr and in the case sends down queued messages or
-   * decremnts sentMsgs if there are no queued messages
-   */
-    @Override // GemStoneAddition  
-    public void up(Event evt) {
-	Message msg;
-	switch (evt.getType()) {
-	case Event.SET_LOCAL_ADDRESS: myAddr = (Address) evt.getArg();
-	    break;
-
-	case Event.MSG:               msg = (Message) evt.getArg();
-	     if(log.isDebugEnabled()) log.debug("Message received");
-	    if (msg.getSrc().equals(myAddr)) {
-		if (queuedMsgs.size() > 0) {
-		     if(log.isDebugEnabled()) log.debug("Message from me received - Queue size was " + queuedMsgs.size());
-		    passDown((Event) queuedMsgs.remove(0));
-		} else {
-		     if(log.isDebugEnabled()) log.debug("Message from me received - No messages in queue");
-		    sentMsgs--;
-		}
-	    }
-	}
-	passUp(evt);
-    }
-
-  /**
-   * Checs if it can send the message, else puts the message in the queue.
-   */
-    @Override // GemStoneAddition  
-    public void down(Event evt) {
-	Message msg;
-	if (evt.getType()==Event.MSG) {
-	    msg = (Message) evt.getArg();
-	    if ((msg.getDest() == null) || (msg.getDest().equals(myAddr))) {
-		if (sentMsgs < MAXSENTMSGS) {
-		    sentMsgs++;
-		     if(log.isDebugEnabled()) log.debug("Message " + sentMsgs + " sent");
-		} else {
-		    queuedMsgs.add(evt); //queues message (we add the event to avoid creating a new event to send the message)
-		    return;
-		}
-	    }
-	}
-	passDown(evt);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java
deleted file mode 100644
index c16e9c6..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLOW_CONTROL.java
+++ /dev/null
@@ -1,317 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: FLOW_CONTROL.java,v 1.10 2005/08/11 12:43:47 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols;
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.blocks.GroupRequest;
-import com.gemstone.org.jgroups.stack.MessageProtocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.ReusableThread;
-import com.gemstone.org.jgroups.util.RspList;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Properties;
-
-
-// @todo Handle view changes (e.g., members {A,B,C}, blocked on C, and C crashes --&gt; unblock).
-/**
- * FLOW_CONTROL provides end-end congestion control and flow control.
- * Attempts to maximize through put, by minimizing the
- * possible block times(Forward flow control). Initially, sender starts with a smaller
- * window size <code> W</code> and large expected RTT <code>grpRTT</code>. Sender also
- * keeps a margin in the window size. When the margin is hit, insted of waiting for the
- * window size to be exhausted, sender multicasts a FLOW_CONTROL info request message.
- * If the window size is exhausted before the responses are received, send will be blocked.
- * FCInfo(flow control info) from all the receivers is gathered at the sender, and current RTT
- * is computed. If the current RTT is greater than estimated RTT window size and margin are reduced,
- * otherwise they are increased.
- * <p>
- * Horizontal interaction is initiated by the sender with the other group members.
- * <p>
- * <em>Note: A reliable transport layer is required for this protocol to function properly.</em>
- * With little effort this can be made completely independent.
- * <p>
- * <br> Also block on down() instead of sending BLOCK_SEND.
- *
- * @author Ananda Bollu
- */
-
-public class FLOW_CONTROL extends MessageProtocol implements Runnable {
-    private int _numMSGsSentThisPeriod=0;
-    private static final String FLOW_CONTROL="FLOW_CONTROL";
-    private final HashMap _rcvdMSGCounter=new HashMap();
-
-    private int _windowSize=1000;
-    private int _fwdMarginSize=200;
-    private int _estimatedRTT=100000;
-    private boolean waitingForResponse=false;
-    private final ReusableThread _reusableThread;
-    private double RTT_WEIGHT=0.125;
-    private int _msgsSentAfterFCreq=0;
-//    private final double TIME_OUT_FACTOR=0.25;//if resp not received from more than n*TIME_OUT_INCREMENT_FACTOR
-//    private final double TIME_OUT_INCR_MULT=1.25;
-    private double WINDOW_SIZE_REDUCTION=0.75;
-    private double WINDOW_SIZE_EXPANSION=1.25;
-    private boolean isBlockState=false;
-
-    private int _windowsize_cap=1000000; //initial window size can not be more than 10^6 messages.
-
-    public FLOW_CONTROL() {
-        _reusableThread=new ReusableThread(FLOW_CONTROL);
-    }
-
-    @Override // GemStoneAddition
-    public String getName() {
-        return FLOW_CONTROL;
-    }
-
-    /**
-     * If Event.MSG type is received count is incremented by one,
-     * and message is passed to the down_prot. At some point,
-     * based on the algorithm(FLOW_CONTROL protocol definition)
-     * data collection sequence is started. This is done by each
-     * member in SENDER role when _numMSGsSentThisPeriod hits the margin.
-     * Before rsp arrives only _fwdMarginSize number of messages can be sent,
-     * and then sender will be blocked.
-     */
-    @Override // GemStoneAddition
-    public boolean handleDownEvent(Event evt) {
-        if(evt.getType() == Event.MSG) {
-            _numMSGsSentThisPeriod++;
-            if((_numMSGsSentThisPeriod > (_windowSize - _fwdMarginSize)) && !waitingForResponse) {
-                waitingForResponse=true;
-                //wait for the previous request to return.before assigning a new task.
-                _reusableThread.waitUntilDone();
-                _reusableThread.assignTask(this);
-            }
-            if(waitingForResponse) {
-                _msgsSentAfterFCreq++;
-                if((_msgsSentAfterFCreq >= _fwdMarginSize) && !isBlockState) {
-
-                    if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL_ACTION_BLOCK);
-                    log.error(ExternalStrings.FLOW_CONTROL_0_0_1, new Object[] {Long.valueOf(System.currentTimeMillis()), Integer.valueOf(_windowSize)});
-                    passUp(new Event(Event.BLOCK_SEND));
-                    isBlockState=true;
-                }
-            }
-        }
-        return true;
-    }
-
-    /**
-     * If Event.MSG type is received message, number of received
-     * messages from the sender is incremented. And the message is
-     * passed up the stack.
-     */
-    @Override // GemStoneAddition
-    public boolean handleUpEvent(Event evt) {
-        if(evt.getType() == Event.MSG) {
-            Message msg=(Message)evt.getArg();
-            Address src=msg.getSrc();
-            FCInfo fcForSrc=(FCInfo)_rcvdMSGCounter.get(src);
-            if(fcForSrc == null) {
-                fcForSrc=new FCInfo();
-                _rcvdMSGCounter.put(src, fcForSrc);
-            }
-            fcForSrc.increment(1);
-
-            if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL_MESSAGE__0__RECEIVED_FROM__1, new Object[] {Integer.valueOf(fcForSrc.getRcvdMSGCount()), src});
-        }
-        return true;
-    }
-
-    /**
-     * Called when a request for this protocol layer is received.
-     * Processes and return value is sent back in the reply.
-     * FLOW_CONTROL protocol of all members gets this message(including sender?)
-     *
-     * @return Object containing FC information for sender with senderID.
-     *         <b>Callback</b>. Called when a request for this protocol layer is received.
-     */
-    @Override // GemStoneAddition
-    public Object handle(Message req) {
-        Address src=req.getSrc();
-        Long resp=Long.valueOf(((FCInfo)_rcvdMSGCounter.get(src)).getRcvdMSGCount());
-
-        if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL_REQEST_CAME_FROM__0__PREPARED_RESPONSE__1, new Object[] {src, resp});
-        return resp;
-    }
-
-    /**
-     * FCInfo request must be submitted in a different thread.
-     * handleDownEvent() can still be called to send messages
-     * while waiting for FCInfo from receivers. usually takes
-     * RTT.
-     */
-    public void run() {
-
-        if(log.isInfoEnabled()) log.info(ExternalStrings.FLOW_CONTROL__HIT_THE__FWDMARGIN_REMAINING_SIZE__0, _fwdMarginSize);
-        reqFCInfo();
-    }
-
-    /**
-     * Following parameters can be optionally supplied:
-     * <ul>
-     * <li>window size cap - <code>int</code> Limits the window size to a reasonable value.
-     * <li>window size - <code>int</code> these many number of messages are sent before a block could happen
-     * <li>forward margin -<code>int</code> a request for flow control information is sent when remaining window size hits this margin
-     * <li>RTT weight -<code>double</code> Max RTT in the group is calculated during each Flow control request. lower number assigns
-     * higher weight to current RTT in estimating RTT.
-     * <li>window size reduction factor -<code>double</code> When current RTT is greater than estimated RTT current window size
-     * is reduced by this multiple.
-     * <li>window size expansion factor -<code>double</code> When current RTT is less than estimated RTT window is incremented
-     * by this multiple.
-     * </ul>
-     *
-     * @see com.gemstone.org.jgroups.stack.Protocol#setProperties(Properties)
-     */
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {
-        String str=null;
-        String winsizekey="window_size";
-        String fwdmrgnkey="fwd_mrgn";
-        String rttweightkey="rttweight";
-        String sizereductionkey="reduction";
-        String sizeexpansionkey="expansion";
-        String windowsizeCapKey="window_size_cap";
-
-        super.setProperties(props);
-        str=props.getProperty(windowsizeCapKey);
-        if(str != null) {
-            _windowsize_cap=Integer.parseInt(str);
-            props.remove(windowsizeCapKey);
-        }
-        str=props.getProperty(winsizekey);
-        if(str != null) {
-            _windowSize=Integer.parseInt(str);
-            if(_windowSize > _windowsize_cap)
-                _windowSize=_windowsize_cap;
-            props.remove(winsizekey);
-        }
-
-        str=props.getProperty(fwdmrgnkey);
-        if(str != null) {
-            _fwdMarginSize=Integer.parseInt(str);
-            props.remove(fwdmrgnkey);
-        }
-
-        str=props.getProperty(rttweightkey);
-        if(str != null) {
-            RTT_WEIGHT=Double.parseDouble(str);
-            props.remove(rttweightkey);
-        }
-
-        str=props.getProperty(sizereductionkey);
-        if(str != null) {
-            WINDOW_SIZE_REDUCTION=Double.parseDouble(str);
-            props.remove(sizereductionkey);
-        }
-
-        str=props.getProperty(sizeexpansionkey);
-        if(str != null) {
-            WINDOW_SIZE_EXPANSION=Double.parseDouble(str);
-            props.remove(sizeexpansionkey);
-        }
-
-
-        if(props.size() > 0) {
-            log.error(ExternalStrings.FLOW_CONTROL_FLOW_CONTROLSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
-            return false;
-        }
-        return true;
-
-    }
-
-    /*-----------private stuff ------*/
-
-    private RspList reqFCInfo() {
-        RspList rspList=null;
-        long reqSentTime=0, rspRcvdTime=0;
-        try {
-            reqSentTime=System.currentTimeMillis();
-            //alternatively use _estimatedRTT for timeout.(timeout is the right way, but need to
-            //check the use cases.
-            rspList=castMessage(null, new Message(null, null, Util.objectToByteBuffer(FLOW_CONTROL)),
-                    GroupRequest.GET_ALL, 0);
-            rspRcvdTime=System.currentTimeMillis();
-        }
-        catch(Exception ex) {
-            ex.printStackTrace();
-        }
-
-        /*If NAKACK layer is present, if n+1 th message is FLOW_CONTROL Request, if responses are received
-          that means all n messages sent earlier are received(?), ignore NAK_ACK.
-        */
-        //ANALYSE RESPONSES
-
-        long currentRTT=rspRcvdTime - reqSentTime;
-
-        if(currentRTT > _estimatedRTT) {
-            _windowSize=(int)(_windowSize * WINDOW_SIZE_REDUCTION);
-            _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_REDUCTION);
-        }
-        else {
-            _windowSize=(int)(_windowSize * WINDOW_SIZE_EXPANSION);
-            if(_windowSize > _windowsize_cap)
-                _windowSize=_windowsize_cap;
-            _fwdMarginSize=(int)(_fwdMarginSize * WINDOW_SIZE_EXPANSION);
-        }
-
-        _estimatedRTT=(int)((RTT_WEIGHT * currentRTT) + (1.0 - RTT_WEIGHT) * _estimatedRTT);
-
-        //reset for new FLOW_CONTROL request period.
-        _numMSGsSentThisPeriod=0;
-        waitingForResponse=false;
-        _msgsSentAfterFCreq=0;
-
-        if(isBlockState) {
-
-            if(warn) log.warn("ACTION UNBLOCK");
-            passUp(new Event(Event.UNBLOCK_SEND));
-            log.error(ExternalStrings.FLOW_CONTROL_1_0_1, new Object[] {Long.valueOf(System.currentTimeMillis()), Integer.valueOf(_windowSize)});
-            isBlockState=false;
-        }
-
-
-        if(warn) log.warn("estimatedTimeout = " + _estimatedRTT);
-        if(warn) log.warn("window size = " + _windowSize + " forward margin size = " + _fwdMarginSize);
-
-        return rspList;
-    }
-
-
-    /* use this instead of Integer. */
-    private static class FCInfo implements Serializable {
-        int _curValue;
-        private static final long serialVersionUID = -8365016426836017979L;
-
-        FCInfo() {
-        }
-
-        public void increment(int i) {
-            _curValue+=i;
-        }
-
-        public int getRcvdMSGCount() {
-            return _curValue;
-        }
-
-        @Override // GemStoneAddition
-        public String toString() {
-            return Integer.toString(_curValue);
-        }
-    }
-
-
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java
deleted file mode 100644
index a5fa5ec..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/FLUSH.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/** Notice of modification as required by the LGPL
- *  This file was modified by Gemstone Systems Inc. on
- *  $Date$
- **/
-// $Id: FLUSH.java,v 1.10 2005/08/11 12:43:47 belaban Exp $
-
-
-
-package com.gemstone.org.jgroups.protocols;
-
-
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.blocks.GroupRequest;
-import com.gemstone.org.jgroups.blocks.MethodCall;
-import com.gemstone.org.jgroups.stack.RpcProtocol;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.List;
-import com.gemstone.org.jgroups.util.Rsp;
-import com.gemstone.org.jgroups.util.RspList;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.util.Enumeration;
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-
-/**
-   The task of the FLUSH protocol is to flush all pending messages out of the system. This is
-   done before a view change by stopping all senders and then agreeing on what messages
-   should be delivered in the current view (before switching to the new view). A coordinator
-   broadcasts a FLUSH message. The message contains an array of the highest sequence number for each member
-   as seen by the coordinator so far. Each member responds with its highest sequence numbers seen so far (for
-   each member): if its sequence number for a member P is higher than the one sent by the coordinator, it
-   will append the messages apparently not received by the coordinator to its reply. The coordinator (when
-   all replies have been received), computes for each member the lowest and highest sequence number and
-   re-broadcasts messages accordingly (using ACKs rather then NAKs to ensure reliable delivery).<p> Example:
-   <pre>
-
-   FLUSH  ---> (p=10, q=22, r=7)
-
-   <-- (p=10, q=20, r=7)    (did not receive 2 messages from q)
-   <-- (p=12, q=23, r=7)    (attached are messages p11, p12, and q23)
-   <-- (p=10, q=22, r=8)    (attached is message r8)
-   ---------------------
-   min:   11    21    8
-   max:   12    23    8
-   </pre>
-
-   The coordinator now computes the range for each member and re-broadcasts messages
-   p11, p12, q21, q22, q23 and r8.
-   This is essentially the exclusive min and inclusive max of all replies. Note that messages p11, p12 and q23
-   were not received by the coordinator itself before. They were only returned as result of the FLUSH replies
-   and the coordinator now re-broadcasts them.
-
-*/
-public class FLUSH extends RpcProtocol  {
-    final Vector   mbrs=new Vector();
-    boolean  is_server=false;
-    final Object   block_mutex=new Object();
-    long     block_timeout=5000;
-    Address  local_addr=null;
-    boolean  blocked=false;  // BLOCK: true, VIEW_CHANGE: false
-    final Object   digest_mutex=new Object();
-    long     digest_timeout=2000;   // time to wait for retrieval of unstable msgs
-
-    final Object   highest_delivered_mutex=new Object();
-    long[]   highest_delivered_msgs;
-
-    Digest   digest=null;
-
-    final Object   get_msgs_mutex=new Object();
-    static/*GemStoneAddition*/ final long     get_msgs_timeout=4000;
-    List     get_msgs=null;
-
-
-
-    @Override // GemStoneAddition
-    public String  getName() {return "FLUSH";}
-
-
-    @Override // GemStoneAddition
-    public Vector providedUpServices() {
-	Vector retval=new Vector();
-	retval.addElement(Integer.valueOf(Event.FLUSH));
-	return retval;
-    }
-
-    @Override // GemStoneAddition
-    public Vector requiredDownServices() {
-	Vector retval=new Vector();
-	retval.addElement(Integer.valueOf(Event.GET_MSGS_RECEIVED));  // NAKACK layer
-	retval.addElement(Integer.valueOf(Event.GET_MSG_DIGEST));     // NAKACK layer
-	retval.addElement(Integer.valueOf(Event.GET_MSGS));           // NAKACK layer
-	return retval;
-    }
-
-
-    @Override // GemStoneAddition
-    public void start() throws Exception {
-        super.start();
-        if(_corr != null) {
-            _corr.setDeadlockDetection(true);
-        }
-        else
-            throw new Exception("FLUSH.start(): cannot set deadlock detection in corr, as it is null !");
-    }
-
-
-    /**
-       Triggered by reception of FLUSH event from GMS layer (must be coordinator). Calls
-       <code>HandleFlush</code> in all members and returns FLUSH_OK event.
-       @param dests A list of members to which the FLUSH is to be sent
-       @return FlushRsp Contains result (true or false), list of unstable messages and list of members
-	       failed during the FLUSH.
-     */
-    private FlushRsp flush(Vector dests) {
-	RspList     rsp_list;
-	FlushRsp    retval=new FlushRsp();
-	Digest      digest;
-	long[]      min, max;
-	long[]      lower[];
-	List        unstable_msgs=new List();
-	boolean     get_lower_msgs=false;
-
-	highest_delivered_msgs=new long[members.size()];
-	min=new long[members.size()];
-	max=new long[members.size()];
-
-
-	/* Determine the highest seqno (for each member) that was delivered to the application
-	   (i.e., consumed by the application). Stores result in array 'highest_delivered_msgs' */
-	getHighestDeliveredSeqnos();
-
-	for(int i=0; i < highest_delivered_msgs.length; i++)
-	    min[i]=max[i]=highest_delivered_msgs[i];
-
-
-	/* Call the handleFlush() method of all existing members. The highest seqnos seen by the coord
-	   is the argument */
-	 if(log.isInfoEnabled()) log.info(ExternalStrings.FLUSH_CALLING_HANDLEFLUSH_0, dests);
-	passDown(new Event(Event.SWITCH_OUT_OF_BAND)); // we need out-of-band control for FLUSH ...
-	MethodCall call = new MethodCall("handleFlush", new Object[] {dests, highest_delivered_msgs.clone()}, 
-		new String[] {Vector.class.getName(), long[].class.getName()});
-	rsp_list=callRemoteMethods(dests, call, GroupRequest.GET_ALL, 0);
-	 if(log.isInfoEnabled()) log.info(ExternalStrings.FLUSH_FLUSH_DONE);
-
-
-	/* Process all the responses (Digest): compute a range of messages (min and max seqno) for each
-	   member that has to be re-broadcast; FlushRsp contains those messages. They will be re-braodcast
-	   by the cordinator (in the GMS protocol). */
-	for(int i=0; i < rsp_list.size(); i++) {
-	    Rsp rsp=(Rsp)rsp_list.elementAt(i);
-	    if(rsp.wasReceived()) {
-		digest=(Digest)rsp.getValue();
-		if(digest != null) {
-		    for(int j=0; j < digest.highest_seqnos.length && j < min.length; j++) {
-			min[j]=Math.min(min[j], digest.highest_seqnos[j]);
-			max[j]=Math.max(max[j], digest.highest_seqnos[j]);
-		    }
-		    if(digest.msgs.size() > 0) {
-			for(Enumeration e=digest.msgs.elements(); e.hasMoreElements();)
-			    unstable_msgs.add(e.nextElement());
-		    }
-		}
-	    }
-	} // end for-loop
-
-
-
-	/* If any of the highest msgs of the flush replies were lower than the ones sent by this
-	   coordinator, we have to re-broadcast them. (This won't occur often)
-	   Compute the range between min and highest_delivered_msgs */
-	lower=new long[min.length][]; // stores (for each mbr) the range of seqnos (e.g. 20 24): send msgs
-				      // 21, 22 and 23 and 24 (excluding lower and including upper range)
-
-	for(int i=0; i < min.length; i++) {
-	    if(min[i] < highest_delivered_msgs[i]) {    // will almost never be the case
-		lower[i]=new long[2];
-		lower[i][0]=min[i];                     // lower boundary (excluding)
-		lower[i][1]=highest_delivered_msgs[i];  // upper boundary (including)
-		get_lower_msgs=true;
-	    }
-	}
-	if(get_lower_msgs) {
-	    get_msgs=null;
-	    synchronized(get_msgs_mutex) {
-		passDown(new Event(Event.GET_MSGS, lower));
-		try {
-		    get_msgs_mutex.wait(get_msgs_timeout);
-		}
-		catch(InterruptedException e) { // GemStoneAddition
-                  Thread.currentThread().interrupt();
-                  // There's no looping or anything in the rest of this
-                  // method, so we just propagate the bit and finish up...
-                }
-	    }
-	    if(get_msgs != null) {
-		for(Enumeration e=get_msgs.elements(); e.hasMoreElements();)
-		    unstable_msgs.add(e.nextElement());
-	    }
-	}
-	retval.unstable_msgs=unstable_msgs.getContents();
-	if(rsp_list.numSuspectedMembers() > 0) {
-	    retval.result=false;
-	    retval.failed_mbrs=rsp_list.getSuspectedMembers();
-	}
-
-	return retval;
-    }
-
-
-
-
-
-    /**
-       Called by coordinator running the FLUSH protocol. Argument is an array of the highest seqnos as seen
-       by the coordinator (for each member). <code>handleFlush()</code> checks for each member its
-       own highest seqno seen for that member. If it is higher than the one seen by the coordinator,
-       all higher messages are attached to the return value (a message digest).
-       @param flush_dests  The members to which this message is sent. Processes not in this list just
-			   ignore the handleFlush() message.
-       @param highest_seqnos The highest sequence numbers (order corresponding to membership) as seen
-			     by coordinator.
-       @return Digest An array of the highest seqnos for each member, as seen by this member. If this
-		      member's seqno for a member P is higher than the one in <code>highest_seqnos</code>,
-		      the missing messages are added to the message digest as well. This allows the
-		      coordinator to re-broadcast missing messages.
-     */
-    public synchronized Digest handleFlush(Vector flush_dests, long[] highest_seqnos) {
-	digest=null;
-
-	 if(log.isInfoEnabled()) log.info("flush_dests=" + flush_dests +
-				   " , highest_seqnos=" + Util.array2String(highest_seqnos));
-
-	if(!is_server) // don't handle the FLUSH if not yet joined to the group
-	    return digest;
-
-	if(flush_dests == null) {
-	     if(warn) log.warn("flush dest is null, ignoring flush !");
-	    return digest;
-	}
-
-	if(flush_dests.size() == 0) {
-	     if(warn) log.warn("flush dest is empty, ignoring flush !");
-	    return digest;
-	}
-
-	if(!flush_dests.contains(local_addr)) {
-
-		if(warn) log.warn("am not in the flush dests, ignoring flush");
-	    return digest;
-	}
-
-	// block sending of messages (only if not already blocked !)
-	if(!blocked) {
-	    blocked=true;
-	    synchronized(block_mutex) {
-		passUp(new Event(Event.BLOCK));
-		try {block_mutex.wait(block_timeout);}
-		catch(InterruptedException e) {
-		  Thread.currentThread().interrupt(); // GemStoneAddition
-                  // Just keep going, we've propagated the bit.
-                }
-	    }
-	}
-
-	// asks NAKACK layer for unstable messages and saves result in 'digest'
-	getMessageDigest(highest_seqnos);
-	 if(log.isInfoEnabled()) log.info(ExternalStrings.FLUSH_RETURNING_DIGEST___0, digest);
-	return digest;
-    }
-
-
-
-
-
-
-    /** Returns the highest seqnos (for each member) seen so far (using the NAKACK layer) */
-    void getHighestDeliveredSeqnos() {
-	synchronized(highest_delivered_mutex) {
-	    passDown(new Event(Event.GET_MSGS_RECEIVED));
-	    try {
-		highest_delivered_mutex.wait(4000);
-	    }
-	    catch(InterruptedException e) {
-              Thread.currentThread().interrupt(); // GemStoneAddition
-              // Just propagate to caller
-//		if(log.isDebugEnabled()) log.debug("exception is " + e);
-	    }
-	}
-    }
-
-
-
-
-
-    /** Interacts with a lower layer to retrieve unstable messages (e.g. NAKACK) */
-    void getMessageDigest(long[] highest_seqnos) {
-	synchronized(digest_mutex) {
-	    passDown(new Event(Event.GET_MSG_DIGEST, highest_seqnos));
-	    try {
-		digest_mutex.wait(digest_timeout);
-	    }
-	    catch(InterruptedException e) {
-	      Thread.currentThread().interrupt(); // GemStoneAddition
-              // just propagate to caller
-            }
-	}
-    }
-
-
-
-
-
-
-
-    /**
-       <b>Callback</b>. Called by superclass when event may be handled.<p>
-       <b>Do not use <code>PassUp</code> in this method as the event is passed up
-       by default by the superclass after this method returns !</b>
-       @return boolean Defaults to true. If false, event will not be passed up the stack.
-     */
-    @Override // GemStoneAddition
-    public boolean handleUpEvent(Event evt) {
-	switch(evt.getType()) {
-
-	case Event.SET_LOCAL_ADDRESS:
-	    local_addr=(Address)evt.getArg();
-	    break;
-
-	case Event.GET_MSG_DIGEST_OK:
-	    synchronized(digest_mutex) {
-		digest=(Digest)evt.getArg();
-		digest_mutex.notifyAll();
-	    }
-	    return false;  // don't pass further up
-
-	case Event.GET_MSGS_RECEIVED_OK:
-	    long[] tmp=(long[])evt.getArg();
-	    if(tmp != null)
-            System.arraycopy(tmp, 0, highest_delivered_msgs, 0, tmp.length);
-	    synchronized(highest_delivered_mutex) {
-		highest_delivered_mutex.notifyAll();
-	    }
-	    return false; // don't pass up any further !
-
-	case Event.GET_MSGS_OK:
-	    synchronized(get_msgs_mutex) {
-		get_msgs=(List)evt.getArg();
-		get_msgs_mutex.notifyAll();
-	    }
-	    break;
-
-	}
-	return true;
-    }
-
-
-    /**
-       <b>Callback</b>. Called by superclass when event may be handled.<p>
-       <b>Do not use <code>PassDown</code> in this method as the event is passed down
-       by default by the superclass after this method returns !</b>
-       @return boolean Defaults to true. If false, event will not be passed down the stack.
-    */
-    @Override // GemStoneAddition
-    public boolean handleDownEvent(Event evt) {
-	Vector    dests;
-	FlushRsp  rsp;
-
-	switch(evt.getType()) {
-	case Event.FLUSH:
-	    dests=(Vector)evt.getArg();
-	    if(dests == null) dests=new Vector();
-	    rsp=flush(dests);
-	    passUp(new Event(Event.FLUSH_OK, rsp));
-	    return false; // don't pass down
-
-	case Event.BECOME_SERVER:
-	    is_server=true;
-	    break;
-
-	case Event.VIEW_CHANGE:
-	    blocked=false;
-
-	    Vector tmp=((View)evt.getArg()).getMembers();
-	    if(tmp != null) {
-		mbrs.removeAllElements();
-		for(int i=0; i < tmp.size(); i++)
-		    mbrs.addElement(tmp.elementAt(i));
-	    }
-	    break;
-	}
-	return true;
-    }
-
-
-
-
-
-    /**
-       The default handling adds the event to the down-queue where events are handled in order of
-       addition by a thread. However, there exists a deadlock between the FLUSH and BLOCK_OK down
-       events: when a FLUSH event is received, a BLOCK is sent up, which triggers a BLOCK_OK event
-       to be sent down to be handled by the FLUSH layer. However, the FLUSH layer's thread is still
-       processing the FLUSH down event and is therefore blocked, waiting for a BLOCK_OK event.
-       Therefore, the BLOCK_OK event has to 'preempt' the FLUSH event processing. This is done by
-       overriding this method: when a BLOCK_OK event is received, it is processed immediately
-       (in parallel to the FLUSH event), which causes the FLUSH event processing to return.
-    */
-    @Override // GemStoneAddition
-    public void receiveDownEvent(Event evt) {
-	if(evt.getType() == Event.BLOCK_OK) { // priority handling, otherwise FLUSH would block !
-	    synchronized(down_queue) {
-		Event event;
-		try {
-		    while(down_queue.size() > 0) {
-			event=(Event)down_queue.remove(10); // wait 10ms at most; queue is *not* empty !
-			down(event);
-		    }
-		}
-		catch(Exception e) {}
-	    }
-
-	    synchronized(block_mutex) {
-		block_mutex.notifyAll();
-	    }
-	    return;
-	}
-	super.receiveDownEvent(evt);
-    }
-
-
-
-    @Override // GemStoneAddition
-    public boolean setProperties(Properties props) {super.setProperties(props);
-	String     str;
-
-	str=props.getProperty("block_timeout");
-	if(str != null) {
-	    block_timeout=Long.parseLong(str);
-	    props.remove("block_timeout");
-	}
-
-	str=props.getProperty("digest_timeout");
-	if(str != null) {
-	    digest_timeout=Long.parseLong(str);
-	    props.remove("digest_timeout");
-	}
-
-	if(props.size() > 0) {
-	    log.error(ExternalStrings.FLUSH_EXAMPLESETPROPERTIES_THESE_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
-	    return false;
-	}
-	return true;
-    }
-
-
-
-}
-