You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:23:09 UTC
[45/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java
deleted file mode 100644
index ef0bb87..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java
+++ /dev/null
@@ -1,636 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: GroupRequest.java,v 1.16 2005/08/27 14:03:17 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.Transport;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.util.Command;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.Rsp;
-import com.gemstone.org.jgroups.util.RspList;
-
-import java.util.*;
-
-
-
-
-
-/**
- * Sends a message to all members of the group and waits for all responses (or timeout). Returns a
- * boolean value (success or failure). Results (if any) can be retrieved when done.<p>
- * The supported transport to send requests is currently either a RequestCorrelator or a generic
- * Transport. One of them has to be given in the constructor. It will then be used to send a
- * request. When a message is received by either one, the receiveResponse() of this class has to
- * be called (this class does not actively receive requests/responses itself). Also, when a view change
- * or suspicion is received, the methods viewChange() or suspect() of this class have to be called.<p>
- * When started, an array of responses, correlating to the membership, is created. Each response
- * is added to the corresponding field in the array. When all fields have been set, the algorithm
- * terminates.
- * This algorithm can optionally use a suspicion service (failure detector) to detect (and
- * exclude from the membership) fauly members. If no suspicion service is available, timeouts
- * can be used instead (see <code>execute()</code>). When done, a list of suspected members
- * can be retrieved.<p>
- * Because a channel might deliver requests, and responses to <em>different</em> requests, the
- * <code>GroupRequest</code> class cannot itself receive and process requests/responses from the
- * channel. A mechanism outside this class has to do this; it has to determine what the responses
- * are for the message sent by the <code>execute()</code> method and call <code>receiveResponse()</code>
- * to do so.<p>
- * <b>Requirements</b>: lossless delivery, e.g. acknowledgment-based message confirmation.
- * @author Bela Ban
- * @version $Revision: 1.16 $
- */
-public class GroupRequest implements RspCollector, Command {
- /** return only first response */
- public static final int GET_FIRST=1;
-
- /** return all responses */
- public static final int GET_ALL=2;
-
- /** return majority (of all non-faulty members) */
- public static final int GET_MAJORITY=3;
-
- /** return majority (of all members, may block) */
- public static final int GET_ABS_MAJORITY=4;
-
- /** return n responses (may block) */
- public static final int GET_N=5;
-
- /** return no response (async call) */
- public static final int GET_NONE=6;
-
- private Address caller=null;
-
- /** Map key=Address, value=Rsp. Maps requests and responses */
- private final Map requests=new HashMap();
-
-
- /** bounded queue of suspected members */
- private final Vector suspects=new Vector();
-
- /** list of members, changed by viewChange() */
- private final Collection members=new TreeSet();
-
- /** keep suspects vector bounded */
- private final int max_suspects=40;
- protected Message request_msg=null;
- protected RequestCorrelator corr=null; // either use RequestCorrelator or ...
- protected Transport transport=null; // Transport (one of them has to be non-null)
-
- protected int rsp_mode=GET_ALL;
- protected boolean done=false;
- protected long timeout=0;
- protected int expected_mbrs=0;
-
- private static final GemFireTracer log=GemFireTracer.getLog(GroupRequest.class);
-
- /** to generate unique request IDs (see getRequestId()) */
- private static long last_req_id=1;
-
- private long req_id=-1; // request ID for this request
-
-
- /**
- @param m The message to be sent
- @param corr The request correlator to be used. A request correlator sends requests tagged with
- a unique ID and notifies the sender when matching responses are received. The
- reason <code>GroupRequest</code> uses it instead of a <code>Transport</code> is
- that multiple requests/responses might be sent/received concurrently.
- @param members The initial membership. This value reflects the membership to which the request
- is sent (and from which potential responses are expected). Is reset by reset().
- @param rsp_mode How many responses are expected. Can be
- <ol>
- <li><code>GET_ALL</code>: wait for all responses from non-suspected members.
- A suspicion service might warn
- us when a member from which a response is outstanding has crashed, so it can
- be excluded from the responses. If no suspision service is available, a
- timeout can be used (a value of 0 means wait forever). <em>If a timeout of
- 0 is used, no suspicion service is available and a member from which we
- expect a response has crashed, this methods blocks forever !</em>.
- <li><code>GET_FIRST</code>: wait for the first available response.
- <li><code>GET_MAJORITY</code>: wait for the majority of all responses. The
- majority is re-computed when a member is suspected.
- <li><code>GET_ABS_MAJORITY</code>: wait for the majority of
- <em>all</em> members.
- This includes failed members, so it may block if no timeout is specified.
- <li><code>GET_N</CODE>: wait for N members.
- Return if n is >= membership+suspects.
- <li><code>GET_NONE</code>: don't wait for any response. Essentially send an
- asynchronous message to the group members.
- </ol>
- */
- public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode) {
- request_msg=m;
- this.corr=corr;
- this.rsp_mode=rsp_mode;
- reset(members);
- // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded
- }
-
-
- /**
- @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
- (e.g. if a suspicion service is available; timeouts are not needed).
- */
- public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode,
- long timeout, int expected_mbrs) {
- this(m, corr, members, rsp_mode);
- if(timeout > 0)
- this.timeout=timeout;
- this.expected_mbrs=expected_mbrs;
- }
-
-
- public GroupRequest(Message m, Transport transport, Vector members, int rsp_mode) {
- request_msg=m;
- this.transport=transport;
- this.rsp_mode=rsp_mode;
- reset(members);
- // suspects.removeAllElements(); // bela Aug 23 2002: make suspects bounded
- }
-
-
- /**
- * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
- * (e.g. if a suspicion service is available; timeouts are not needed).
- */
- public GroupRequest(Message m, Transport transport, Vector members,
- int rsp_mode, long timeout, int expected_mbrs) {
- this(m, transport, members, rsp_mode);
- if(timeout > 0)
- this.timeout=timeout;
- this.expected_mbrs=expected_mbrs;
- }
-
- public Address getCaller() {
- return caller;
- }
-
- public void setCaller(Address caller) {
- this.caller=caller;
- }
-
- /**
- * Sends the message. Returns when n responses have been received, or a
- * timeout has occurred. <em>n</em> can be the first response, all
- * responses, or a majority of the responses.
- */
- public boolean execute() {
- boolean retval;
- if(corr == null && transport == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.GroupRequest_BOTH_CORR_AND_TRANSPORT_ARE_NULL_CANNOT_SEND_GROUP_REQUEST);
- return false;
- }
- done=false;
- retval=doExecute(timeout);
- if(retval == false && log.isTraceEnabled())
- log.trace("call did not execute correctly, request is " + toString());
- done=true;
- return retval;
- }
-
-
-
- /**
- * This method sets the <code>membership</code> variable to the value of
- * <code>members</code>. It requires that the caller already hold the
- * <code>rsp_mutex</code> lock.
- * @param mbrs The new list of members
- */
- public void reset(Vector mbrs) {
- if(mbrs != null) {
- Address mbr;
- synchronized(requests) {
- requests.clear();
- for(int i=0; i < mbrs.size(); i++) {
- mbr=(Address)mbrs.elementAt(i);
- requests.put(mbr, new Rsp(mbr));
- }
- }
- // maintain local membership
- synchronized(this.members) {
- this.members.clear();
- this.members.addAll(mbrs);
- }
- }
- else {
- synchronized(requests) {
- Rsp rsp;
- for(Iterator it=requests.values().iterator(); it.hasNext();) {
- rsp=(Rsp)it.next();
- rsp.setReceived(false);
- rsp.setValue(null);
- }
- }
- }
- }
-
-
- /* ---------------------- Interface RspCollector -------------------------- */
- /**
- * <b>Callback</b> (called by RequestCorrelator or Transport).
- * Adds a response to the response table. When all responses have been received,
- * <code>execute()</code> returns.
- */
- public void receiveResponse(Message m) {
- Address sender=m.getSrc();
- Object val=null;
- if(done) {
- if(log.isWarnEnabled()) log.warn("command is done; cannot add response !");
- return;
- }
- if(suspects != null && suspects.size() > 0 && suspects.contains(sender)) {
- if(log.isWarnEnabled()) log.warn("received response from suspected member " + sender + "; discarding");
- return;
- }
- if(m.getLength() > 0) {
- try {
- val=m.getObject();
- }
- catch(Exception e) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.GroupRequest_EXCEPTION_0, e);
- }
- }
-
- synchronized(requests) {
- Rsp rsp=(Rsp)requests.get(sender);
- if(rsp != null) {
- if(rsp.wasReceived() == false) {
- rsp.setValue(val);
- rsp.setReceived(true);
- if(log.isTraceEnabled())
- log.trace(new StringBuffer("received response for request ").append(req_id).append(", sender=").
- append(sender).append(", val=").append(val));
- requests.notifyAll(); // wakes up execute()
- }
- }
- }
- }
-
-
- /**
- * <b>Callback</b> (called by RequestCorrelator or Transport).
- * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).
- * This method would probably be called when getting a suspect message from a failure detector
- * (where available). It is used to exclude faulty members from the response list.
- */
- public void suspect(Address suspected_member) {
- Rsp rsp;
-
- if(suspected_member == null)
- return;
-
- addSuspect(suspected_member);
-
- synchronized(requests) {
- rsp=(Rsp)requests.get(suspected_member);
- if(rsp != null) {
- rsp.setSuspected(true);
- rsp.setValue(null);
- requests.notifyAll();
- }
- }
- }
-
-
- /**
- * Any member of 'membership' that is not in the new view is flagged as
- * SUSPECTED. Any member in the new view that is <em>not</em> in the
- * membership (ie, the set of responses expected for the current RPC) will
- * <em>not</em> be added to it. If we did this we might run into the
- * following problem:
- * <ul>
- * <li>Membership is {A,B}
- * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the
- * invocation handler)
- * <li>C joins while A waits for responses from A and B
- * <li>If this would generate a new view {A,B,C} and if this expanded the
- * response set to {A,B,C}, A would wait forever on C's response because C
- * never received the request in the first place, therefore won't send a
- * response.
- * </ul>
- */
- public void viewChange(View new_view) {
- Address mbr;
- Vector mbrs=new_view != null? new_view.getMembers() : null;
- if(requests == null || requests.size() == 0 || mbrs == null)
- return;
-
- synchronized(this.members) {
- this.members.clear();
- this.members.addAll(mbrs);
- }
-
- Map.Entry entry;
- Rsp rsp;
- boolean modified=false;
- synchronized(requests) {
- for(Iterator it=requests.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- mbr=(Address)entry.getKey();
- if(!mbrs.contains(mbr)) {
- addSuspect(mbr);
- rsp=(Rsp)entry.getValue();
- rsp.setValue(null);
- rsp.setSuspected(true);
- modified=true;
- }
- }
- if(modified)
- requests.notifyAll();
- }
- }
-
-
- /* -------------------- End of Interface RspCollector ----------------------------------- */
-
-
-
- /** Returns the results as a RspList */
- public RspList getResults() {
- synchronized(requests) {
- Collection rsps=requests.values();
- RspList retval=new RspList(rsps);
- return retval;
- }
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer ret=new StringBuffer(128);
- ret.append("[GroupRequest:\n");
- ret.append("req_id=").append(req_id).append('\n');
- if(caller != null)
- ret.append("caller=").append(caller).append("\n");
-
- Map.Entry entry;
- Address mbr;
- Rsp rsp;
- synchronized(requests) {
- for(Iterator it=requests.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- mbr=(Address)entry.getKey();
- rsp=(Rsp)entry.getValue();
- ret.append(mbr).append(": ").append(rsp).append("\n");
- }
- }
- if(suspects.size() > 0)
- ret.append("\nsuspects: ").append(suspects);
- ret.append("\nrequest_msg: ").append(request_msg);
- ret.append("\nrsp_mode: ").append(modeToString(rsp_mode));
- ret.append("\ndone: ").append(done);
- ret.append("\ntimeout: ").append(timeout);
- ret.append("\nexpected_mbrs: ").append(expected_mbrs);
- ret.append("\n]");
- return ret.toString();
- }
-
-
- public int getNumSuspects() {
- return suspects.size();
- }
-
-
- public Vector getSuspects() {
- return suspects;
- }
-
-
- public boolean isDone() {
- return done;
- }
-
-
-
- /* --------------------------------- Private Methods -------------------------------------*/
-
- private int determineMajority(int i) {
- return i < 2? i : (i / 2) + 1;
- }
-
- /** Generates a new unique request ID */
- private static synchronized long getRequestId() {
- long result=System.currentTimeMillis();
- if(result <= last_req_id) {
- result=last_req_id + 1;
- }
- last_req_id=result;
- return result;
- }
-
- /** This method runs with rsp_mutex locked (called by <code>execute()</code>). */
- private boolean doExecute(long timeout) {
- long start_time=0;
- Address suspect;
- req_id=getRequestId();
- reset(null); // clear 'responses' array
-
- synchronized(requests) {
- for(int i=0; i < suspects.size(); i++) { // mark all suspects in 'received' array
- suspect=(Address)suspects.elementAt(i);
- Rsp rsp=(Rsp)requests.get(suspect);
- if(rsp != null) {
- rsp.setSuspected(true);
- break; // we can break here because we ensure there are no duplicate members
- }
- }
- }
-
- try {
- if(log.isTraceEnabled()) log.trace(new StringBuffer("sending request (id=").append(req_id).append(')'));
- if(corr != null) {
- java.util.List tmp=new Vector(members);
- corr.sendRequest(req_id, tmp, request_msg, rsp_mode == GET_NONE? null : this);
- }
- else {
- transport.send(request_msg);
- }
- }
- catch(Throwable e) {
- log.error(ExternalStrings.GroupRequest_EXCEPTION_0, e);
- if(corr != null) {
- corr.done(req_id);
- }
- return false;
- }
-
- synchronized(requests) {
- if(timeout <= 0) {
- while(true) { /* Wait for responses: */
- adjustMembership(); // may not be necessary, just to make sure...
- if(responsesComplete()) {
- if(corr != null) {
- corr.done(req_id);
- }
- if(log.isTraceEnabled()) {
- log.trace("received all responses: " + toString());
- }
- return true;
- }
- try {
- requests.wait();
- }
- catch(InterruptedException e) { // GemStoneAddition
- Thread.currentThread().interrupt();
- return false; // treat as timeout
- }
- }
- }
- else {
- start_time=System.currentTimeMillis();
- while(timeout > 0) { /* Wait for responses: */
- if(responsesComplete()) {
- if(corr != null)
- corr.done(req_id);
- if(log.isTraceEnabled()) log.trace("received all responses: " + toString());
- return true;
- }
- timeout=timeout - (System.currentTimeMillis() - start_time);
- if(timeout > 0) {
- try {
- requests.wait(timeout);
- }
- catch(InterruptedException e) { // GemStoneAddition
- Thread.currentThread().interrupt();
- break; // treat as timeout
- }
- }
- }
- if(corr != null) {
- corr.done(req_id);
- }
- return false;
- }
- }
- }
-
- private boolean responsesComplete() {
- int num_received=0, num_not_received=0, num_suspected=0, num_total=requests.size();
- int majority=determineMajority(num_total);
-
- Rsp rsp;
- for(Iterator it=requests.values().iterator(); it.hasNext();) {
- rsp=(Rsp)it.next();
- if(rsp.wasReceived()) {
- num_received++;
- }
- else {
- if(rsp.wasSuspected()) {
- num_suspected++;
- }
- else {
- num_not_received++;
- }
- }
- }
-
- switch(rsp_mode) {
- case GET_FIRST:
- if(num_received > 0)
- return true;
- if(num_suspected >= num_total)
- // e.g. 2 members, and both suspected
- return true;
- break;
- case GET_ALL:
- if(num_not_received > 0)
- return false;
- return true;
- case GET_MAJORITY:
- if(num_received + num_suspected >= majority)
- return true;
- break;
- case GET_ABS_MAJORITY:
- if(num_received >= majority)
- return true;
- break;
- case GET_N:
- if(expected_mbrs >= num_total) {
- rsp_mode=GET_ALL;
- return responsesComplete();
- }
- if(num_received >= expected_mbrs) {
- return true;
- }
- if(num_received + num_not_received < expected_mbrs) {
- if(num_received + num_suspected >= expected_mbrs) {
- return true;
- }
- return false;
- }
- return false;
- case GET_NONE:
- return true;
- default :
- if(log.isErrorEnabled()) log.error(ExternalStrings.GroupRequest_RSP_MODE__0__UNKNOWN_, rsp_mode);
- break;
- }
- return false;
- }
-
-
-
-
-
- /**
- * Adjusts the 'received' array in the following way:
- * <ul>
- * <li>if a member P in 'membership' is not in 'members', P's entry in the 'received' array
- * will be marked as SUSPECTED
- * <li>if P is 'suspected_mbr', then P's entry in the 'received' array will be marked
- * as SUSPECTED
- * </ul>
- * This call requires exclusive access to rsp_mutex (called by getResponses() which has
- * a the rsp_mutex locked, so this should not be a problem).
- */
- private void adjustMembership() {
- if(requests.size() == 0)
- return;
-
- Map.Entry entry;
- Address mbr;
- Rsp rsp;
- for(Iterator it=requests.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- mbr=(Address)entry.getKey();
- if((!this.members.contains(mbr)) || suspects.contains(mbr)) {
- addSuspect(mbr);
- rsp=(Rsp)entry.getValue();
- rsp.setValue(null);
- rsp.setSuspected(true);
- }
- }
- }
-
- /**
- * Adds a member to the 'suspects' list. Removes oldest elements from 'suspects' list
- * to keep the list bounded ('max_suspects' number of elements)
- */
- private void addSuspect(Address suspected_mbr) {
- if(!suspects.contains(suspected_mbr)) {
- suspects.addElement(suspected_mbr);
- while(suspects.size() >= max_suspects && suspects.size() > 0)
- suspects.remove(0); // keeps queue bounded
- }
- }
-
- private String modeToString(int m) {
- switch(m) {
- case GET_FIRST: return "GET_FIRST";
- case GET_ALL: return "GET_ALL";
- case GET_MAJORITY: return "GET_MAJORITY";
- case GET_ABS_MAJORITY: return "GET_ABS_MAJORITY";
- case GET_N: return "GET_N";
- case GET_NONE: return "GET_NONE";
- default: return "<unknown> (" + m + ")";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java.old
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java.old b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java.old
deleted file mode 100644
index 160b9fa..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/GroupRequest.java.old
+++ /dev/null
@@ -1,641 +0,0 @@
-// $Id: GroupRequest.java,v 1.13 2005/07/22 15:53:37 belaban Exp $
-
-package org.jgroups.blocks;
-
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jgroups.Address;
-import org.jgroups.Message;
-import org.jgroups.Transport;
-import org.jgroups.View;
-import org.jgroups.util.Command;
-import org.jgroups.util.RspList;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Vector;
-
-
-
-
-
-/**
- * Sends a message to all members of the group and waits for all responses (or timeout). Returns a
- * boolean value (success or failure). Results (if any) can be retrieved when done.<p>
- * The supported transport to send requests is currently either a RequestCorrelator or a generic
- * Transport. One of them has to be given in the constructor. It will then be used to send a
- * request. When a message is received by either one, the receiveResponse() of this class has to
- * be called (this class does not actively receive requests/responses itself). Also, when a view change
- * or suspicion is received, the methods viewChange() or suspect() of this class have to be called.<p>
- * When started, an array of responses, correlating to the membership, is created. Each response
- * is added to the corresponding field in the array. When all fields have been set, the algorithm
- * terminates.
- * This algorithm can optionally use a suspicion service (failure detector) to detect (and
- * exclude from the membership) fauly members. If no suspicion service is available, timeouts
- * can be used instead (see <code>execute()</code>). When done, a list of suspected members
- * can be retrieved.<p>
- * Because a channel might deliver requests, and responses to <em>different</em> requests, the
- * <code>GroupRequest</code> class cannot itself receive and process requests/responses from the
- * channel. A mechanism outside this class has to do this; it has to determine what the responses
- * are for the message sent by the <code>execute()</code> method and call <code>receiveResponse()</code>
- * to do so.<p>
- * <b>Requirements</b>: lossless delivery, e.g. acknowledgment-based message confirmation.
- * @author Bela Ban
- * @version $Revision: 1.13 $
- */
-public class GroupRequest implements RspCollector, Command {
- /** return only first response */
- public static final int GET_FIRST=1;
-
- /** return all responses */
- public static final int GET_ALL=2;
-
- /** return majority (of all non-faulty members) */
- public static final int GET_MAJORITY=3;
-
- /** return majority (of all members, may block) */
- public static final int GET_ABS_MAJORITY=4;
-
- /** return n responses (may block) */
- public static final int GET_N=5;
-
- /** return no response (async call) */
- public static final int GET_NONE=6;
-
- private static final short NOT_RECEIVED=0;
- private static final short RECEIVED=1;
- private static final short SUSPECTED=2;
-
- /** Map<Address, Rsp>. Maps requests and responses */
- private final Map requests=new HashMap();
-
-
- private Address membership[]=null; // current membership
- private Object responses[]=null; // responses corresponding to membership
- private short received[]=null; // status of response for each mbr (see above)
-
- /** bounded queue of suspected members */
- private final Vector suspects=new Vector();
-
- /** list of members, changed by viewChange() */
- private final Vector members=new Vector();
-
- /** keep suspects vector bounded */
- private final int max_suspects=40;
- protected Message request_msg=null;
- protected RequestCorrelator corr=null; // either use RequestCorrelator or ...
- protected Transport transport=null; // Transport (one of them has to be non-null)
-
- protected int rsp_mode=GET_ALL;
- protected boolean done=false;
- protected final Object rsp_mutex=new Object();
- protected long timeout=0;
- protected int expected_mbrs=0;
-
- private static final Log log=LogFactory.getLog(GroupRequest.class);
-
- /** to generate unique request IDs (see getRequestId()) */
- private static long last_req_id=1;
-
- private long req_id=-1; // request ID for this request
-
-
- /**
- @param m The message to be sent
- @param corr The request correlator to be used. A request correlator sends requests tagged with
- a unique ID and notifies the sender when matching responses are received. The
- reason <code>GroupRequest</code> uses it instead of a <code>Transport</code> is
- that multiple requests/responses might be sent/received concurrently.
- @param members The initial membership. This value reflects the membership to which the request
- is sent (and from which potential responses are expected). Is reset by reset().
- @param rsp_mode How many responses are expected. Can be
- <ol>
- <li><code>GET_ALL</code>: wait for all responses from non-suspected members.
- A suspicion service might warn
- us when a member from which a response is outstanding has crashed, so it can
- be excluded from the responses. If no suspision service is available, a
- timeout can be used (a value of 0 means wait forever). <em>If a timeout of
- 0 is used, no suspicion service is available and a member from which we
- expect a response has crashed, this methods blocks forever !</em>.
- <li><code>GET_FIRST</code>: wait for the first available response.
- <li><code>GET_MAJORITY</code>: wait for the majority of all responses. The
- majority is re-computed when a member is suspected.
- <li><code>GET_ABS_MAJORITY</code>: wait for the majority of
- <em>all</em> members.
- This includes failed members, so it may block if no timeout is specified.
- <li><code>GET_N</CODE>: wait for N members.
- Return if n is >= membership+suspects.
- <li><code>GET_NONE</code>: don't wait for any response. Essentially send an
- asynchronous message to the group members.
- </ol>
- */
- public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode) {
- request_msg=m;
- this.corr=corr;
- this.rsp_mode=rsp_mode;
- reset(members);
- // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded
- }
-
-
- /**
- @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
- (e.g. if a suspicion service is available; timeouts are not needed).
- */
- public GroupRequest(Message m, RequestCorrelator corr, Vector members, int rsp_mode,
- long timeout, int expected_mbrs) {
- this(m, corr, members, rsp_mode);
- if(timeout > 0)
- this.timeout=timeout;
- this.expected_mbrs=expected_mbrs;
- }
-
-
- public GroupRequest(Message m, Transport transport, Vector members, int rsp_mode) {
- request_msg=m;
- this.transport=transport;
- this.rsp_mode=rsp_mode;
- reset(members);
- // suspects.removeAllElements(); // bela Aug 23 2002: make suspects bounded
- }
-
-
- /**
- * @param timeout Time to wait for responses (ms). A value of <= 0 means wait indefinitely
- * (e.g. if a suspicion service is available; timeouts are not needed).
- */
- public GroupRequest(Message m, Transport transport, Vector members,
- int rsp_mode, long timeout, int expected_mbrs) {
- this(m, transport, members, rsp_mode);
- if(timeout > 0)
- this.timeout=timeout;
- this.expected_mbrs=expected_mbrs;
- }
-
-
- /**
- * Sends the message. Returns when n responses have been received, or a
- * timeout has occurred. <em>n</em> can be the first response, all
- * responses, or a majority of the responses.
- */
- public boolean execute() {
- boolean retval;
- if(corr == null && transport == null) {
- if(log.isErrorEnabled()) log.error("both corr and transport are null, cannot send group request");
- return false;
- }
- synchronized(rsp_mutex) {
- done=false;
- retval=doExecute(timeout);
- if(retval == false && log.isTraceEnabled())
- log.trace("call did not execute correctly, request is " + toString());
- done=true;
- return retval;
- }
- }
-
-
- /**
- * Resets the group request, so it can be reused for another execution.
- */
-// public void reset(Message m, int mode, long timeout) {
-// synchronized(rsp_mutex) {
-// done=false;
-// request_msg=m;
-// rsp_mode=mode;
-// this.timeout=timeout;
-// rsp_mutex.notifyAll();
-// }
-// }
-//
-//
-// public void reset(Message m, final Vector members, int rsp_mode, long timeout, int expected_rsps) {
-// synchronized(rsp_mutex) {
-// reset(m, rsp_mode, timeout);
-// reset(members);
-// // suspects.removeAllElements(); // bela Aug 23 2002: made suspects bounded
-// this.expected_mbrs=expected_rsps;
-// rsp_mutex.notifyAll();
-// }
-// }
-
- /**
- * This method sets the <code>membership</code> variable to the value of
- * <code>members</code>. It requires that the caller already hold the
- * <code>rsp_mutex</code> lock.
- * @param mbrs The new list of members
- */
- public void reset(Vector mbrs) {
- if(mbrs != null) {
- int size=mbrs.size();
- membership=new Address[size];
- responses=new Object[size];
- received=new short[size];
- for(int i=0; i < size; i++) {
- membership[i]=(Address)mbrs.elementAt(i);
- responses[i]=null;
- received[i]=NOT_RECEIVED;
- }
- // maintain local membership
- this.members.clear();
- this.members.addAll(mbrs);
- }
- else {
- if(membership != null) {
- for(int i=0; i < membership.length; i++) {
- responses[i]=null;
- received[i]=NOT_RECEIVED;
- }
- }
- }
- }
-
-
- /* ---------------------- Interface RspCollector -------------------------- */
- /**
- * <b>Callback</b> (called by RequestCorrelator or Transport).
- * Adds a response to the response table. When all responses have been received,
- * <code>execute()</code> returns.
- */
- public void receiveResponse(Message m) {
- Address sender=m.getSrc(), mbr;
- Object val=null;
- if(done) {
- if(log.isWarnEnabled()) log.warn("command is done; cannot add response !");
- return;
- }
- if(suspects != null && suspects.size() > 0 && suspects.contains(sender)) {
- if(log.isWarnEnabled()) log.warn("received response from suspected member " + sender + "; discarding");
- return;
- }
- if(m.getLength() > 0) {
- try {
- val=m.getObject();
- }
- catch(Exception e) {
- if(log.isErrorEnabled()) log.error("exception=" + e);
- }
- }
- synchronized(rsp_mutex) {
- for(int i=0; i < membership.length; i++) {
- mbr=membership[i];
- if(mbr.equals(sender)) {
- if(received[i] == NOT_RECEIVED) {
- responses[i]=val;
- received[i]=RECEIVED;
- if(log.isTraceEnabled())
- log.trace("received response for request " + req_id + ", sender=" + sender + ", val=" + val);
- rsp_mutex.notifyAll(); // wakes up execute()
- break;
- }
- }
- }
- }
- }
-
-
- /**
- * <b>Callback</b> (called by RequestCorrelator or Transport).
- * Report to <code>GroupRequest</code> that a member is reported as faulty (suspected).
- * This method would probably be called when getting a suspect message from a failure detector
- * (where available). It is used to exclude faulty members from the response list.
- */
- public void suspect(Address suspected_member) {
- Address mbr;
- synchronized(rsp_mutex) { // modify 'suspects' and 'responses' array
- for(int i=0; i < membership.length; i++) {
- mbr=membership[i];
- if(mbr.equals(suspected_member)) {
- addSuspect(suspected_member);
- responses[i]=null;
- received[i]=SUSPECTED;
- rsp_mutex.notifyAll();
- break;
- }
- }
- }
- }
-
-
- /**
- * Any member of 'membership' that is not in the new view is flagged as
- * SUSPECTED. Any member in the new view that is <em>not</em> in the
- * membership (ie, the set of responses expected for the current RPC) will
- * <em>not</em> be added to it. If we did this we might run into the
- * following problem:
- * <ul>
- * <li>Membership is {A,B}
- * <li>A sends a synchronous group RPC (which sleeps for 60 secs in the
- * invocation handler)
- * <li>C joins while A waits for responses from A and B
- * <li>If this would generate a new view {A,B,C} and if this expanded the
- * response set to {A,B,C}, A would wait forever on C's response because C
- * never received the request in the first place, therefore won't send a
- * response.
- * </ul>
- */
- public void viewChange(View new_view) {
- Address mbr;
- Vector mbrs=new_view != null? new_view.getMembers() : null;
- if(membership == null || membership.length == 0 || mbrs == null)
- return;
-
- synchronized(rsp_mutex) {
- this.members.clear();
- this.members.addAll(mbrs);
- for(int i=0; i < membership.length; i++) {
- mbr=membership[i];
- if(!mbrs.contains(mbr)) {
- addSuspect(mbr);
- responses[i]=null;
- received[i]=SUSPECTED;
- }
- }
- rsp_mutex.notifyAll();
- }
- }
-
-
- /* -------------------- End of Interface RspCollector ----------------------------------- */
-
-
-
- /** Returns the results as a RspList */
- public RspList getResults() {
- RspList retval=new RspList();
- Address sender;
- synchronized(rsp_mutex) {
- for(int i=0; i < membership.length; i++) {
- sender=membership[i];
- switch(received[i]) {
- case SUSPECTED:
- retval.addSuspect(sender);
- break;
- case RECEIVED:
- retval.addRsp(sender, responses[i]);
- break;
- case NOT_RECEIVED:
- retval.addNotReceived(sender);
- break;
- }
- }
- return retval;
- }
- }
-
-
- public String toString() {
- StringBuffer ret=new StringBuffer();
- ret.append("[GroupRequest:\n");
- ret.append("req_id=").append(req_id).append('\n');
- ret.append("members: ");
- for(int i=0; i < membership.length; i++)
- ret.append(membership[i] + " ");
- ret.append("\nresponses: ");
- for(int i=0; i < responses.length; i++)
- ret.append(responses[i] + " ");
- ret.append("\nreceived: ");
- for(int i=0; i < received.length; i++)
- ret.append(receivedToString(received[i]) + " ");
- if(suspects.size() > 0)
- ret.append("\nsuspects: ").append(suspects);
- ret.append("\nrequest_msg: ").append(request_msg);
- ret.append("\nrsp_mode: ").append(rsp_mode);
- ret.append("\ndone: ").append(done);
- ret.append("\ntimeout: ").append(timeout);
- ret.append("\nexpected_mbrs: ").append(expected_mbrs);
- ret.append("\n]");
- return ret.toString();
- }
-
-
- public int getNumSuspects() {
- return suspects.size();
- }
-
-
- public Vector getSuspects() {
- return suspects;
- }
-
-
- public boolean isDone() {
- return done;
- }
-
-
-
- /* --------------------------------- Private Methods -------------------------------------*/
-
- private int determineMajority(int i) {
- return i < 2? i : (i / 2) + 1;
- }
-
- /** Generates a new unique request ID */
- private static synchronized long getRequestId() {
- long result=System.currentTimeMillis();
- if(result <= last_req_id) {
- result=last_req_id + 1;
- }
- last_req_id=result;
- return result;
- }
-
- /** This method runs with rsp_mutex locked (called by <code>execute()</code>). */
- private boolean doExecute(long timeout) {
- long start_time=0;
- Address mbr, suspect;
- req_id=getRequestId();
- reset(null); // clear 'responses' array
- if(suspects != null) { // mark all suspects in 'received' array
- for(int i=0; i < suspects.size(); i++) {
- suspect=(Address)suspects.elementAt(i);
- for(int j=0; j < membership.length; j++) {
- mbr=membership[j];
- if(mbr.equals(suspect)) {
- received[j]=SUSPECTED;
- break; // we can break here because we ensure there are no duplicate members
- }
- }
- }
- }
-
- try {
- if(log.isTraceEnabled()) log.trace(new StringBuffer("sending request (id=").append(req_id).append(')'));
- if(corr != null) {
- java.util.List tmp=members != null? members : null;
- corr.sendRequest(req_id, tmp, request_msg, rsp_mode == GET_NONE? null : this);
- }
- else {
- transport.send(request_msg);
- }
- }
- catch(Throwable e) {
- log.error("exception=" + e);
- if(corr != null) {
- corr.done(req_id);
- }
- return false;
- }
-
- if(timeout <= 0) {
- while(true) { /* Wait for responses: */
- adjustMembership(); // may not be necessary, just to make sure...
- if(getResponses()) {
- if(corr != null) {
- corr.done(req_id);
- }
- if(log.isTraceEnabled()) {
- log.trace("received all responses: " + toString());
- }
- return true;
- }
- try {
- rsp_mutex.wait();
- }
- catch(Exception e) {
- }
- }
- }
- else {
- start_time=System.currentTimeMillis();
- while(timeout > 0) { /* Wait for responses: */
- if(getResponses()) {
- if(corr != null)
- corr.done(req_id);
- if(log.isTraceEnabled()) log.trace("received all responses: " + toString());
- return true;
- }
- timeout=timeout - (System.currentTimeMillis() - start_time);
- if(timeout > 0) {
- try {
- rsp_mutex.wait(timeout);
- }
- catch(Exception e) {
- }
- }
- }
- if(corr != null) {
- corr.done(req_id);
- }
- return false;
- }
- }
-
- private boolean getResponses() {
- int num_not_received=getNum(NOT_RECEIVED);
- int num_received=getNum(RECEIVED);
- int num_suspected=getNum(SUSPECTED);
- int num_total=membership.length;
- int majority=determineMajority(num_total);
- switch(rsp_mode) {
- case GET_FIRST:
- if(num_received > 0)
- return true;
- if(num_suspected >= num_total)
- // e.g. 2 members, and both suspected
- return true;
- break;
- case GET_ALL:
- if(num_not_received > 0)
- return false;
- return true;
- case GET_MAJORITY:
- if(num_received + num_suspected >= majority)
- return true;
- break;
- case GET_ABS_MAJORITY:
- if(num_received >= majority)
- return true;
- break;
- case GET_N:
- if(expected_mbrs >= num_total) {
- rsp_mode=GET_ALL;
- return getResponses();
- }
- if(num_received >= expected_mbrs) {
- return true;
- }
- if(num_received + num_not_received < expected_mbrs) {
- if(num_received + num_suspected >= expected_mbrs) {
- return true;
- }
- return false;
- }
- return false;
- case GET_NONE:
- return true;
- default :
- if(log.isErrorEnabled()) log.error("rsp_mode " + rsp_mode + " unknown !");
- break;
- }
- return false;
- }
-
- /** Return number of elements of a certain type in array 'received'. Type can be RECEIVED,
- NOT_RECEIVED or SUSPECTED */
- private int getNum(int type) {
- int retval=0;
- for(int i=0; i < received.length; i++)
- if(received[i] == type)
- retval++;
- return retval;
- }
-
-
-
- private String receivedToString(int r) {
- switch(r) {
- case RECEIVED:
- return "RECEIVED";
- case NOT_RECEIVED:
- return "NOR_RECEIVED";
- case SUSPECTED:
- return "SUSPECTED";
- default:
- return "n/a";
- }
- }
-
-
- /**
- * Adjusts the 'received' array in the following way:
- * <ul>
- * <li>if a member P in 'membership' is not in 'members', P's entry in the 'received' array
- * will be marked as SUSPECTED
- * <li>if P is 'suspected_mbr', then P's entry in the 'received' array will be marked
- * as SUSPECTED
- * </ul>
- * This call requires exclusive access to rsp_mutex (called by getResponses() which has
- * a the rsp_mutex locked, so this should not be a problem).
- */
- private void adjustMembership() {
- Address mbr;
- if(membership == null || membership.length == 0) {
- // if(log.isWarnEnabled()) log.warn("GroupRequest.adjustMembership()", "membership is null");
- return;
- }
- for(int i=0; i < membership.length; i++) {
- mbr=membership[i];
- if((this.members != null && !this.members.contains(mbr))
- || suspects.contains(mbr)) {
- addSuspect(mbr);
- responses[i]=null;
- received[i]=SUSPECTED;
- }
- }
- }
-
- /**
- * Adds a member to the 'suspects' list. Removes oldest elements from 'suspects' list
- * to keep the list bounded ('max_suspects' number of elements)
- */
- private void addSuspect(Address suspected_mbr) {
- if(!suspects.contains(suspected_mbr)) {
- suspects.addElement(suspected_mbr);
- while(suspects.size() >= max_suspects && suspects.size() > 0)
- suspects.remove(0); // keeps queue bounded
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.java
deleted file mode 100644
index b89b00c..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.java
+++ /dev/null
@@ -1,680 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: Link.java,v 1.7 2005/05/30 16:14:33 belaban Exp $
-
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.TimedWriter;
-import com.gemstone.org.jgroups.util.Util;
-
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
-import java.io.*;
-import java.net.InetAddress;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-
-
-
-
-/**
- * Implements a physical link between 2 parties (point-to-point connection). For incoming traffic,
- * a server socket is created (bound to a given local address and port). The receiver thread does the
- * following: it accepts a new connection from the server socket and (on the same thread) reads messages
- * until the connection breaks. Then it goes back to accept(). This is done in 2 nested while-loops.
- * The outgoing connection is established when started. If this fails, the link is marked as not established.
- * This means that there is not outgoing socket.<br>
- * A heartbeat will be exchanged between the 2 peers periodically as long as the connection is established
- * (outgoing socket is okay). When the connection breaks, the heartbeat will stop and a connection establisher
- * thread will be started. It periodically tries to re-establish connection to the peer. When this happens
- * it will stop and the heartbeat thread will resume.<br>
- * For details see Link.txt
- * @author Bela Ban, June 2000
- */
-@SuppressFBWarnings(value="DE_MIGHT_IGNORE",justification="GemFire does not use this class")
-public class Link implements Runnable {
- String local_addr=null, remote_addr=null;
- InetAddress local=null, remote=null;
- int local_port=0, remote_port=0;
- ServerSocket srv_sock=null;
- Socket outgoing=null; // traffic to peer
- Socket incoming=null; // traffic from peer
- DataOutputStream outstream=null;
- DataInputStream instream=null;
- boolean established=false; // (incoming and outgoing) connections to peer are up and running
-// boolean stop=false; GemStoneAddition
- boolean trace=false;
-
- // GemStoneAddition access receiver_thread while synchronized on this
- Thread receiver_thread=null;
- static/*GemStoneAddition*/ final long receiver_thread_join_timeout=2000;
- Receiver receiver=null;
- static final int HB_PACKET=-99;
- Heartbeat hb=null;
- long timeout=10000; // if no heartbeat was received for timeout msecs, assume peer is dead
- long hb_interval=3000; // send a heartbeat every n msecs
- final Object outgoing_mutex=new Object(); // sync on creation and closing of outgoing socket
- TimedWriter writer=null;
- GemFireTracer log=GemFireTracer.getLog(getClass());
-
-
- public interface Receiver {
- void receive(byte[] msg);
- void linkDown(InetAddress local, int local_port, InetAddress remote, int remote_port);
- void linkUp(InetAddress local, int local_port, InetAddress remote, int remote_port);
- void missedHeartbeat(InetAddress local, int local_port, InetAddress remote, int remote_port, int num_hbs);
- void receivedHeartbeatAgain(InetAddress local, int local_port, InetAddress remote, int remote_port);
- }
-
-
-
- public Link(String local_addr, int local_port, String remote_addr, int remote_port) {
- this.local_addr=local_addr; this.local_port=local_port;
- this.remote_addr=remote_addr; this.remote_port=remote_port;
- hb=new Heartbeat(timeout, hb_interval);
- }
-
-
- public Link(String local_addr, int local_port, String remote_addr, int remote_port, Receiver r) {
- this(local_addr, local_port, remote_addr, remote_port);
- setReceiver(r);
- }
-
-
-
- public Link(String local_addr, int local_port, String remote_addr, int remote_port,
- long timeout, long hb_interval, Receiver r) {
- this.local_addr=local_addr; this.local_port=local_port;
- this.remote_addr=remote_addr; this.remote_port=remote_port;
- this.timeout=timeout; this.hb_interval=hb_interval;
- hb=new Heartbeat(timeout, hb_interval);
- setReceiver(r);
- }
-
-
- public void setTrace(boolean t) {trace=t;}
- public void setReceiver(Receiver r) {receiver=r;}
- public boolean established() {return established;}
- public InetAddress getLocalAddress() {return local;}
- public InetAddress getRemoteAddress() {return remote;}
- public int getLocalPort() {return local_port;}
- public int getRemotePort() {return remote_port;}
-
-
-
-
-
- public void start() throws Exception {
- local=InetAddress.getByName(local_addr);
- remote=InetAddress.getByName(remote_addr);
- srv_sock=new ServerSocket(local_port, 1, local);
- createOutgoingConnection(hb_interval); // connection to peer established, sets established=true
- startReceiverThread(); // start reading from incoming socket
- hb.start(); // starts heartbeat (conn establisher is not yet started)
- }
-
-
-
- public void stop() {
- stopReceiverThread();
- hb.stop();
- try {srv_sock.close();} catch(Exception e) {}
- established=false;
- }
-
-
-
-
-
- /** Tries to send buffer across out socket. Tries to establish connection if not yet connected. */
- public boolean send(byte[] buf) {
- if(buf == null || buf.length == 0) {
- if(trace) System.err.println("Link.send(): buffer is null or does not contain any data !");
- return false;
- }
- if(!established) { // will be set by ConnectionEstablisher when connection has been set up
- if(trace) log.error(ExternalStrings.Link_LINKSEND_CONNECTION_NOT_ESTABLISHED_DISCARDING_MESSAGE);
- return false;
- }
-
- try {
- outstream.writeInt(buf.length); // synchronized anyway
- outstream.write(buf); // synchronized anyway, we don't need to sync on outstream
- return true;
- }
- catch(Exception ex) { // either IOException or EOFException (subclass if IOException)
- if(trace) log.error(ExternalStrings.Link_LINKSEND1_SENDING_FAILED_RETRYING);
- return retry(buf);
- }
- }
-
-
- boolean retry(byte[] buf) {
- closeOutgoingConnection(); // there something wrong, close connection
- if(!createOutgoingConnection()) { // ... and re-open. if this fails,
- closeOutgoingConnection(); // just abort and return failure to caller
- return false;
- }
- else {
- try {
- outstream.writeInt(buf.length);
- outstream.write(buf);
- return true;
- }
- catch(Exception e) {
- if(trace) System.out.println("Link.send2(): failed, closing connection");
- closeOutgoingConnection();
- return false;
- }
- }
- }
-
-
-
-
- /** Receiver thread main loop. Accept a connection and then read on it until the connection
- breaks. Only then is the next connection handled. The reason is that there is only supposed
- to be 1 connection to this server socket at the same time.
- */
- public void run() {
- int num_bytes;
- byte[] buf;
- InetAddress peer=null;
- int peer_port=0;
-
- for (;;) { // GemStoneAddition remove coding anti-pattern while(!stop)
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- try {
- if(trace) System.out.println("-- WAITING for ACCEPT");
- incoming=srv_sock.accept();
- instream=new DataInputStream(incoming.getInputStream());
- peer=incoming.getInetAddress();
- peer_port=incoming.getPort();
-
-
- if(trace) System.out.println("-- ACCEPT: incoming is " + printSocket(incoming));
-
-
- /** This piece of code would only accept connections from the peer address defined above. */
- if(remote.equals(incoming.getInetAddress())) {
- if(trace)
- System.out.println("Link.run(): accepted connection from " + peer + ':' + peer_port);
- }
- else {
- if(trace)
- log.error("Link.run(): rejected connection request from " +
- peer + ':' + peer_port + ". Address not specified as peer in link !");
- closeIncomingConnection(); // only close incoming connection
- continue;
- }
-
- // now try to create outgoing connection
- if(!established) {
- createOutgoingConnection();
- }
-
- for (;;) { // GemStoneAddition remove coding anti-pattern while(!stop)
- if (Thread.currentThread().isInterrupted()) break; // GemStoneAddition
- try {
- num_bytes=instream.readInt();
- if(num_bytes == HB_PACKET) {
- hb.receivedHeartbeat();
- continue;
- }
-
- buf=new byte[num_bytes];
- instream.readFully(buf, 0, buf.length);
- hb.receivedMessage(); // equivalent to heartbeat response (HB_PACKET)
- if(receiver != null)
- receiver.receive(buf);
- }
- catch(Exception ex) { // IOException, EOFException, SocketException
- closeIncomingConnection(); // close incoming when read() fails
- break;
- }
- }
- }
- catch(IOException io_ex) {
-// receiver_thread=null; GemStoneAddition
- break;
- }
- catch(Exception e) {
- }
- }
- }
-
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer ret=new StringBuffer();
- ret.append("Link <" + local_addr + ':' + local_port + " --> " +
- remote_addr + ':' + remote_port + '>');
- ret.append(established? " (established)" : " (not established)");
- return ret.toString();
- }
-
-
- @Override // GemStoneAddition
- public boolean equals(Object other) {
- Link o;
-
- if(other == null)
- return false;
- if(!(other instanceof Link))
- return false;
- o=(Link)other;
- if(local_addr.equals(o.local_addr) && remote_addr.equals(o.remote_addr) &&
- local_port == o.local_port && remote_port == o.remote_port)
- return true;
- else
- return false;
- }
-
-
- @Override // GemStoneAddition
- public int hashCode() {
- return local_addr.hashCode() + remote_addr.hashCode() + local_port + remote_port;
- }
-
-
- void startReceiverThread() {
- synchronized (this) { // GemStoneAddition
- stopReceiverThread();
- receiver_thread=new Thread(this, "Link.ReceiverThreadThread");
- receiver_thread.setDaemon(true);
- receiver_thread.start();
- }
- }
-
- synchronized /* GemStoneAddition */ void stopReceiverThread() {
- Thread t = receiver_thread; // GemStoneAddition
- if(t != null && t.isAlive()) {
-// stop=true; GemStoneAddition
- closeIncomingConnection();
- t.interrupt();
- try {t.join(receiver_thread_join_timeout);} catch(InterruptedException e) {Thread.currentThread().interrupt(); } // GemStoneAddition
-// stop=false; GemStoneAddition
- }
- receiver_thread=null;
- }
-
-
-
-
- /** Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
- stop the connection establisher ! The reason is that this method is going to be called by the
- connection establisher as well, therefore it would kill itself ! */
- boolean createOutgoingConnection() {
- synchronized(outgoing_mutex) { // serialize access with ConnectionEstablisher
- if(established) {
- return true;
- }
- try {
- // create a socket to remote:remote_port, bind to local address (choose any local port);
- outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port
- outgoing.setSoLinger(true, 1); // 1 second // +++ ? needed ? it is off by default !
- outstream=new DataOutputStream(outgoing.getOutputStream());
- if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);
- established=true;
-
- if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing));
-
- return true;
- }
- catch(Exception e) {
- established=false;
- return false;
- }
- }
- }
-
-
-
-
- /**
- Tries to create an outgoing connection. If successful, the heartbeat is started. Does <em>not</em>
- stop the connection establisher ! The reason is that this method is going to be called by the
- connection establisher as well, therefore it would kill itself !
- */
- boolean createOutgoingConnection(long timeout) {
- synchronized(outgoing_mutex) { // serialize access with ConnectionEstablisher
- if(established) {
- return true;
- }
- try {
- if(writer == null) writer=new TimedWriter();
-
- // create a socket to remote:remote_port, bind to local address (choose any local port);
- // outgoing=new Socket(remote, remote_port, local, 0); // 0 means choose any local port
- outgoing=writer.createSocket(local, remote, remote_port, timeout);
- outgoing.setSoLinger(true, 1); // 1 second // +++ ? needed ? it is off by default !
- outstream=new DataOutputStream(outgoing.getOutputStream());
- if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);
- established=true;
- if(trace) System.out.println("-- CREATE: outgoing is " + printSocket(outgoing));
- return true;
- }
- catch(Exception e) {
- established=false;
- return false;
- }
- }
- }
-
-
-
-
- /** Closes the outgoing connection */
- void closeOutgoingConnection() {
- synchronized(outgoing_mutex) {
- if(!established) {
- return;
- }
- if(outstream != null) {
-
- if(trace) System.out.println("-- CLOSE: outgoing is " + printSocket(outgoing));
-
- try {
- outstream.close(); // flush data before socket is closed
- }
- catch(Exception e) {}
- outstream=null;
- }
- if(outgoing != null) {
- try {
- outgoing.close();
- }
- catch(Exception e) {}
- outgoing=null;
- }
- established=false;
- if(receiver != null) receiver.linkDown(local, local_port, remote, remote_port);
- }
- }
-
-
- /** When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()),
- then it closes the outgoing *and* incoming socket. The latter needs to be done,
- so that we can return to accept() and await a new client connection request. */
- synchronized void closeIncomingConnection() {
- if(instream != null) {
-
- if(trace) System.out.println("-- CLOSE: incoming is " + printSocket(incoming));
-
- try {instream.close();} catch(Exception e) {}
- instream=null;
- }
- if(incoming != null) {
- try {incoming.close();} catch(Exception e) {}
- incoming=null;
- }
- }
-
-
-
- /** Close outgoing and incoming sockets. */
- synchronized void closeConnections() {
-
- // 1. Closes the outgoing connection. Then the connection establisher is started. The heartbeat
- // thread cannot be stopped in here, because this method is called by it !
- closeOutgoingConnection();
-
-
- // 2. When the heartbeat thread detects that the peer 'hangs' (not detected by incoming.read()),
- // then it closes the outgoing *and* incoming socket. The latter needs to be done,
- // so that we can return to accept() and await a new client connection request.
- closeIncomingConnection();
- }
-
-
-
-
- String printSocket(Socket s) {
- if(s == null) return "<null>";
- StringBuffer ret=new StringBuffer();
- ret.append(s.getLocalAddress().getHostName());
- ret.append(':');
- ret.append(s.getLocalPort());
- ret.append(" --> ");
- ret.append(s.getInetAddress().getHostName());
- ret.append(':');
- ret.append(s.getPort());
- return ret.toString();
- }
-
-
-
-
-
-
-
-
-
-
- /**
- Sends heartbeats across the link as long as we are connected (established=true). Uses a TimedWriter
- for both sending and responding to heartbeats. The reason is that a write() might hang if the
- peer has not closed its end, but the connection hangs (e.g. network partition, peer was stop-a'ed,
- ctrl-z of peer or peer's NIC was unplumbed) and the writer buffer is filled to capacity. This way,
- we don't hang sending timeouts.
- */
- class Heartbeat implements Runnable {
- Thread thread=null;
- long hb_timeout=10000; // time to wait for heartbeats from peer, if not received -> boom !
- long interval=3000; // {send a heartbeat | try to create connection} every 3 secs
-// volatile boolean stop_hb=false; GemStoneAddition remove volatile anti-pattern
- long last_hb=System.currentTimeMillis();
- boolean missed_hb=false;
- final TimedWriter timed_writer=new TimedWriter();
-
-
-
- public Heartbeat(long timeout, long hb_interval) {
- this.hb_timeout=timeout;
- this.interval=hb_interval;
- }
-
-
- public synchronized void start() {
- stop();
-// stop_hb=false; GemStoneAddition
- missed_hb=false;
- last_hb=System.currentTimeMillis();
- thread=new Thread(this, "HeartbeatThread");
- thread.setDaemon(true);
- thread.start();
- }
-
-
- public synchronized void interrupt() {
- thread.interrupt();
- }
-
-
- public synchronized void stop() {
- if(thread != null && thread.isAlive()) {
-// stop_hb=true; GemStoneAddition
- missed_hb=false;
- thread.interrupt();
- try {thread.join(hb_timeout+1000);} catch(InterruptedException e) {Thread.currentThread().interrupt();} // GemStoneAddition
- thread=null;
- }
- }
-
-
-
- /**
- When we receive a message from the peer, this means the peer is alive. Therefore we
- update the time of the last heartbeat.
- */
- public void receivedMessage() {
- last_hb=System.currentTimeMillis();
- if(missed_hb) {
- if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);
- missed_hb=false;
- }
- }
-
-
- /** Callback, called by the Link whenever it encounters a heartbeat (HB_PACKET) */
- public void receivedHeartbeat() {
- last_hb=System.currentTimeMillis();
- if(missed_hb) {
- if(receiver != null) receiver.receivedHeartbeatAgain(local, local_port, remote, remote_port);
- missed_hb=false;
- }
- }
-
-
- /**
- Sends heartbeats when connection is established. Tries to establish connection when not established.
- Switches between 'established' and 'not established' roles.
- */
- public void run() {
- long diff=0, curr_time=0, num_missed_hbs=0;
-
- if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " started");
- for (;;) { // GemStoneAddition - remove volatile anti-pattern while(!stop_hb)
-
- if(established) { // send heartbeats
-
- // 1. Send heartbeat (use timed write)
- if(outstream != null) {
- try {
- timed_writer.write(outstream, HB_PACKET, 1500);
- Thread.sleep(interval);
- }
- catch (InterruptedException e) { // GemStoneAddition
- break; // exit loop and thread. No need to set interrupt bit.
- }
- catch(Exception io_ex) { // IOException and TimedWriter.Timeout
- closeOutgoingConnection(); // sets established to false
- continue;
- }
- }
- else {
- established=false;
- continue;
- }
-
- // 2. If time of last HB received > timeout --> close connection
- curr_time=System.currentTimeMillis();
- diff=curr_time - last_hb;
-
- if(curr_time - last_hb > interval) {
- num_missed_hbs=(curr_time - last_hb) / interval;
- if(receiver != null)
- receiver.missedHeartbeat(local, local_port, remote, remote_port, (int)num_missed_hbs);
- missed_hb=true;
- }
-
- if(diff >= hb_timeout) {
- if(trace) System.out.println("###### Link.Heartbeat.run(): no heartbeat receveived for " +
- diff + " msecs. Closing connections. #####");
- closeConnections(); // close both incoming *and* outgoing connections
- }
- }
- else { // try to establish connection
- synchronized(outgoing_mutex) { // serialize access with createOutgoingConnection()
- if(established) {
- continue;
- }
- try {
- outgoing=timed_writer.createSocket(local, remote, remote_port, interval);
- outstream=new DataOutputStream(outgoing.getOutputStream());
- if(receiver != null) receiver.linkUp(local, local_port, remote, remote_port);
- established=true;
- if(trace) System.out.println("-- CREATE (CE): " + printSocket(outgoing));
- continue;
- }
- catch(InterruptedException interrupted_ex) {
- break; // GemStoneAddition. Exit loop and thread
- }
- catch(Exception ex) { // IOException, TimedWriter.Timeout
- try { // GemStoneAddition
- Util.sleep(interval); // returns when done or interrupted
- }
- catch (InterruptedException e) {
- break; // exit loop and thread. No need to reset interrupt.
- }
- }
- }
- }
- }
- if(trace) System.out.println("heartbeat to " + remote + ':' + remote_port + " stopped");
- thread=null;
- }
- }
-
-
-
-
-
-
-
-
- protected/*GemStoneAddition*/ static class MyReceiver implements Link.Receiver {
-
- public void receive(byte[] msg) {
- System.out.println("<-- " + new String(msg));
- }
-
- public void linkDown(InetAddress l, int lp, InetAddress r, int rp) {
- System.out.println("** linkDown(): " + r + ':' + rp);
- }
-
- public void linkUp(InetAddress l, int lp, InetAddress r, int rp) {
- System.out.println("** linkUp(): " + r + ':' + rp);
- }
-
- public void missedHeartbeat(InetAddress l, int lp, InetAddress r, int rp, int num) {
- System.out.println("** missedHeartbeat(): " + r + ':' + rp);
- }
-
- public void receivedHeartbeatAgain(InetAddress l, int lp, InetAddress r, int rp) {
- System.out.println("** receivedHeartbeatAgain(): " + r + ':' + rp);
- }
- }
-
-
-
-// public static void main(String[] args) {
-// String local, remote;
-// int local_port, remote_port;
-//
-//
-// if(args.length != 4) {
-// System.err.println("\nLink <local host> <local port> <remote host> <remote port>\n");
-// return;
-// }
-// local=args[0];
-// remote=args[2];
-// local_port=Integer.parseInt(args[1]);
-// remote_port=Integer.parseInt(args[3]);
-//
-// Link l=new Link(local, local_port, remote, remote_port, new MyReceiver());
-//
-// try {
-// l.start();
-// System.out.println(l);
-//
-// BufferedReader in= new BufferedReader(new InputStreamReader(System.in));
-// while(true) {
-// System.out.print("> "); System.out.flush();
-// String line=in.readLine();
-// l.send(line.getBytes());
-// }
-// }
-// catch(Exception e) {
-// System.err.println(e);
-// }
-// }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.txt
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.txt b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.txt
deleted file mode 100644
index 1dc53f0..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/Link.txt
+++ /dev/null
@@ -1,48 +0,0 @@
-
-
-Incoming connection
--------------------
-
-- created by accept()
-
-- closed upon read() failure on incoming (IOException, EOFException)
-
-- closed by missing heartbeats (Heartbeat)
-
-- closed when peer with address different from expected connects
- (connection rejected)
-
-
-Outgoing connection
--------------------
-
-- created when Link is started
-
-- created after creation of incoming (upon accept()). not created if
- already exists
-
-- created by ConnectionEstablisher. not created if already exists
-
-- closed upon write() failure on outgoing
-
-- closed by missing heartbeats (Heartbeat)
-
-
-Heartbeat
----------
-
-- monitors outgoing by periodically writing to it (heartbeats)
-
-- started upon creation of outgoing
-
-- stopped upon closing of outgoing (write-failure, heartbeat failure)
-
-
-ConnectionEstablisher
----------------------
-
-- attempts to periodically create outgoing connection (when down)
-
-- started when outgoing goes (is) down
-
-- stopped when outgoing is up
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockManager.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockManager.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockManager.java
deleted file mode 100644
index 5b4f6a0..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockManager.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-import com.gemstone.org.jgroups.ChannelException;
-
-/**
- * <code>LockManager</code> represents generic lock manager that allows
- * obtaining and releasing locks on objects.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- * @author Robert Schaffar-Taurok (robert@fusion.at)
- * @version $Id: LockManager.java,v 1.2 2005/06/08 15:56:54 publicnmi Exp $
- */
-public interface LockManager {
-
- /**
- * Obtain lock on <code>obj</code> for specified <code>owner</code>.
- * Implementation should try to obtain lock few times within the
- * specified timeout.
- *
- * @param obj obj to lock, usually not full object but object's ID.
- * @param owner object identifying entity that will own the lock.
- * @param timeout maximum time that we grant to obtain a lock.
- *
- * @throws LockNotGrantedException if lock is not granted within
- * specified period.
- *
- * @throws ClassCastException if <code>obj</code> and/or
- * <code>owner</code> is not of type that implementation expects to get
- * (for example, when distributed lock manager obtains non-serializable
- * <code>obj</code> or <code>owner</code>).
- *
- * @throws ChannelException if something bad happened to communication
- * channel.
- */
- void lock(Object obj, Object owner, int timeout)
- throws LockNotGrantedException, ClassCastException, ChannelException;
-
- /**
- * Release lock on <code>obj</code> owned by specified <code>owner</code>.
- *
- * since 2.2.9 this method is only a wrapper for
- * unlock(Object lockId, Object owner, boolean releaseMultiLocked).
- * Use that with releaseMultiLocked set to true if you want to be able to
- * release multiple locked locks (for example after a merge)
- *
- * @param obj obj to lock, usually not full object but object's ID.
- * @param owner object identifying entity that will own the lock.
- *
- * @throws ClassCastException if <code>obj</code> and/or
- * <code>owner</code> is not of type that implementation expects to get
- * (for example, when distributed lock manager obtains non-serializable
- * <code>obj</code> or <code>owner</code>).
- *
- * @throws ChannelException if something bad happened to communication
- * channel.
- */
- void unlock(Object obj, Object owner)
- throws LockNotReleasedException, ClassCastException, ChannelException;
-
- /**
- * Release lock on <code>obj</code> owned by specified <code>owner</code>.
- *
- * @param obj obj to lock, usually not full object but object's ID.
- * @param owner object identifying entity that will own the lock.
- * @param releaseMultiLocked force unlocking of the lock if the local
- * lockManager owns the lock even if another lockManager owns the same lock
- *
- * @throws ClassCastException if <code>obj</code> and/or
- * <code>owner</code> is not of type that implementation expects to get
- * (for example, when distributed lock manager obtains non-serializable
- * <code>obj</code> or <code>owner</code>).
- *
- * @throws ChannelException if something bad happened to communication
- * channel.
- *
- * @throws LockMultiLockedException if the lock was unlocked, but another
- * node already held the lock
- */
- void unlock(Object obj, Object owner, boolean releaseMultiLocked)
- throws LockNotReleasedException, ClassCastException, ChannelException, LockMultiLockedException;
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockMultiLockedException.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockMultiLockedException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockMultiLockedException.java
deleted file mode 100644
index a7c9ef8..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockMultiLockedException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-
-/**
- * Thrown by the {@link com.gemstone.org.jgroups.blocks.DistributedLockManager#unlock(Object, Object, boolean)} method if a lock is only locally released, because it is locked
- * by multiple DistributedLockManagers. This can happen after a merge for example.
- *
- * @author Robert Schaffar-Taurok (robert@fusion.at)
- * @version $Id: LockMultiLockedException.java,v 1.2 2005/07/17 11:36:40 chrislott Exp $
- */
-public class LockMultiLockedException extends Exception {
-private static final long serialVersionUID = 131820252305444362L;
-
- public LockMultiLockedException() {
- super();
- }
-
- public LockMultiLockedException(String s) {
- super(s);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotGrantedException.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotGrantedException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotGrantedException.java
deleted file mode 100644
index 5352967..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotGrantedException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-/**
- * This exception indicated that lock manager refused to give a lock on
- * some resource.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- */
-public class LockNotGrantedException extends Exception {
-private static final long serialVersionUID = 2196485072005850175L;
-
- public LockNotGrantedException() {
- super();
- }
-
- public LockNotGrantedException(String s) {
- super(s);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotReleasedException.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotReleasedException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotReleasedException.java
deleted file mode 100644
index 63ed609..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockNotReleasedException.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-package com.gemstone.org.jgroups.blocks;
-
-/**
- * This exception indicated that lock manager refused to release a lock on
- * some resource.
- *
- * @author Roman Rokytskyy (rrokytskyy@acm.org)
- */
-public class LockNotReleasedException extends Exception {
-private static final long serialVersionUID = -9174045093160454258L;
-
- public LockNotReleasedException() {
- super();
- }
-
- public LockNotReleasedException(String s) {
- super(s);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockingException.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockingException.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockingException.java
deleted file mode 100644
index 8798d87..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/blocks/LockingException.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: LockingException.java,v 1.1.1.1 2003/09/09 01:24:08 belaban Exp $
-
-
-package com.gemstone.org.jgroups.blocks;
-
-import java.util.Map;
-
-
-public class LockingException extends Exception {
-private static final long serialVersionUID = 591671983577853564L;
- Map failed_lockers=null; // list of members who failed acquiring locks (keys=Address, values=exception string)
-
- public LockingException(String msg) {
- super(msg);
- }
-
- public LockingException(Map m) {
- super("LockingException");
- failed_lockers=m;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
-
- sb.append(super.toString());
-
- if(failed_lockers != null && failed_lockers.size() > 0)
- sb.append(" (failed members: ").append(failed_lockers);
- return sb.toString();
- }
-
-}