You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:29 UTC
[05/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new
deleted file mode 100644
index 9eec23a..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.new
+++ /dev/null
@@ -1,890 +0,0 @@
-// $Id$
-
-package org.jgroups.protocols.pbcast;
-
-
-import org.jgroups.*;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.Streamable;
-import org.jgroups.util.TimeScheduler;
-import org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Vector;
-
-
-
-
-/**
- * Computes the broadcast messages that are stable, i.e. have been received by all members. Sends
- * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
- * have been seen by all members.<p>
- * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
- * A stability vector, which maintains the highest seqno for each member and initially contains no data,
- * is updated when such a message is received. The entry for a member P is computed set to
- * min(entry[P], digest[P]). When messages from all members have been received, a stability
- * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
- * in the NAKACK layer).<p>
- * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
- * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
- * STABLE messages in the face of no activity.<br/>
- * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
- * a STABLE task will be started (unless it is already running).
- * @author Bela Ban
- */
-public class STABLE extends Protocol {
- Address local_addr=null;
- final Vector mbrs=new Vector();
- final Digest digest=new Digest(10); // keeps track of the highest seqnos from all members
- final Vector heard_from=new Vector(); // keeps track of who we already heard from (STABLE_GOSSIP msgs)
-
- /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
- long desired_avg_gossip=20000;
-
- /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
- * small number (> 0 !) if <code>max_bytes</code> is used */
- long stability_delay=6000;
- StabilitySendTask stability_task=null;
- final Object stability_mutex=new Object(); // to synchronize on stability_task
- StableTask stable_task=null; // bcasts periodic STABLE message (added to timer below)
- final Object stable_task_mutex=new Object(); // to sync on stable_task
- TimeScheduler timer=null; // to send periodic STABLE msgs (and STABILITY messages)
- int max_gossip_runs=3; // max. number of times the StableTask runs before terminating
- int num_gossip_runs=max_gossip_runs; // this number is decremented (max_gossip_runs doesn't change)
- static final String name="STABLE";
-
- /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
- * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
- * <code>stability_delay</code> should be set to a low number as well */
- long max_bytes=0;
-
- /** The total number of bytes received from unicast and multicast messages */
- long num_bytes_received=0;
-
- /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
- * handle STABILITY messages */
- boolean suspended=false;
-
- /** Max time we should hold off on message garbage collection. This is a second line of defense in case
- * we get a SUSPEND_STABLE, but forget to send a corresponding RESUME_STABLE (which should never happen !)
- * The consequence of a missing RESUME_STABLE would be that the group doesn't garbage collect stable
- * messages anymore, eventually, with a lot of traffic, every member would accumulate messages and run
- * out of memory !
- */
- // long max_suspend_time=600000;
-
- ResumeTask resume_task=null;
- final Object resume_task_mutex=new Object();
-
- /** Number of gossip messages */
- int num_gossips=0;
-
-
- public String getName() {
- return name;
- }
-
- public long getDesiredAverageGossip() {
- return desired_avg_gossip;
- }
-
- public void setDesiredAverageGossip(long gossip_interval) {
- desired_avg_gossip=gossip_interval;
- }
-
- public long getMaxBytes() {
- return max_bytes;
- }
-
- public void setMaxBytes(long max_bytes) {
- this.max_bytes=max_bytes;
- }
-
- public int getNumberOfGossipMessages() {return num_gossips;}
-
- public void resetStats() {
- super.resetStats();
- num_gossips=0;
- }
-
-
- public Vector requiredDownServices() {
- Vector retval=new Vector();
- retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); // NAKACK layer
- return retval;
- }
-
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("digest_timeout");
- if(str != null) {
- props.remove("digest_timeout");
- log.error("digest_timeout has been deprecated; it will be ignored");
- }
-
- str=props.getProperty("desired_avg_gossip");
- if(str != null) {
- desired_avg_gossip=Long.parseLong(str);
- props.remove("desired_avg_gossip");
- }
-
- str=props.getProperty("stability_delay");
- if(str != null) {
- stability_delay=Long.parseLong(str);
- props.remove("stability_delay");
- }
-
- str=props.getProperty("max_gossip_runs");
- if(str != null) {
- max_gossip_runs=Integer.parseInt(str);
- num_gossip_runs=max_gossip_runs;
- props.remove("max_gossip_runs");
- }
-
- str=props.getProperty("max_bytes");
- if(str != null) {
- max_bytes=Long.parseLong(str);
- props.remove("max_bytes");
- }
-
- str=props.getProperty("max_suspend_time");
- if(str != null) {
- log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
- props.remove("max_suspend_time");
- }
-
- if(props.size() > 0) {
- log.error("these properties are not recognized: " + props);
-
- return false;
- }
- return true;
- }
-
-
- private void suspend(long timeout) {
- if(!suspended) {
- suspended=true;
- if(log.isDebugEnabled())
- log.debug("suspending message garbage collection");
- }
- startResumeTask(timeout); // will not start task if already running
- }
-
- private void resume() {
- suspended=false;
- if(log.isDebugEnabled())
- log.debug("resuming message garbage collection");
- stopResumeTask();
- }
-
- public void start() throws Exception {
- if(stack != null && stack.timer != null)
- timer=stack.timer;
- else
- throw new Exception("timer cannot be retrieved from protocol stack");
- initializeDigest();
- }
-
- public void stop() {
- stopStableTask();
- clearDigest();
- }
-
-
- public void up(Event evt) {
- Message msg;
- StableHeader hdr;
- Header obj;
- int type=evt.getType();
-
- switch(type) {
-
- case Event.MSG:
- msg=(Message)evt.getArg();
-
- if(max_bytes > 0) { // message counting is enabled
- long size=Math.max(msg.getLength(), 24);
- num_bytes_received+=size;
- if(num_bytes_received >= max_bytes) {
- if(log.isTraceEnabled()) {
- StringBuffer sb=new StringBuffer("max_bytes has been reached (max_bytes=");
- sb.append(max_bytes).append(", number of bytes received=");
- sb.append(num_bytes_received).append("): sending STABLE message");
- log.trace(sb.toString());
- }
- // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
- passDown(new Event(Event.GET_DIGEST_STABLE));
- num_bytes_received=0;
- }
- }
-
- obj=msg.getHeader(name);
- if(obj == null || !(obj instanceof StableHeader))
- break;
- hdr=(StableHeader)msg.removeHeader(name);
- switch(hdr.type) {
- case StableHeader.STABLE_GOSSIP:
- handleStableGossip(msg.getSrc(), hdr.stableDigest);
- break;
- case StableHeader.STABILITY:
- handleStabilityMessage(hdr.stableDigest);
- break;
- default:
- if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");
- }
- return; // don't pass STABLE or STABILITY messages up the stack
-
- case Event.VIEW_CHANGE:
- View view=(View)evt.getArg();
- handleViewChange(view);
- break;
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
-
- case Event.GET_DIGEST_STABLE_OK:
- Digest d=(Digest)evt.getArg();
- updateFromOwnDigest(d);
- sendStableMessage();
- break;
- }
-
- passUp(evt);
- if(desired_avg_gossip > 0) {
- if(type == Event.VIEW_CHANGE || type == Event.MSG) {
- startStableTask(); // only starts task if not yet running
- }
- }
- }
-
-
-
-
- public void down(Event evt) {
- int type=evt.getType();
-
- switch(evt.getType()) {
- case Event.VIEW_CHANGE:
- View v=(View)evt.getArg();
- handleViewChange(v);
- break;
-
- case Event.SUSPEND_STABLE:
- long timeout=0;
- Object t=evt.getArg();
- if(t != null && t instanceof Long)
- timeout=((Long)t).longValue();
- stopStableTask();
- suspend(timeout);
- break;
-
- case Event.RESUME_STABLE:
- resume();
- break;
- }
-
- if(desired_avg_gossip > 0) {
- if(type == Event.VIEW_CHANGE || type == Event.MSG)
- startStableTask(); // only starts task if not yet running
- }
-
- passDown(evt);
- }
-
-
- public void runMessageGarbageCollection() {
- sendStableMessage();
- }
-
-
-
- /* --------------------------------------- Private Methods ---------------------------------------- */
-
-
- private void handleViewChange(View v) {
- Vector tmp=v.getMembers();
- mbrs.clear();
- mbrs.addAll(tmp);
- resetHeardFromList(tmp);
- stopStableTask();
- }
-
-
- private void initializeDigest() {
- Address mbr;
- synchronized(digest) {
- for(int i=0; i < mbrs.size(); i++) {
- mbr=(Address)mbrs.get(i);
- if(!digest.contains(mbr))
- digest.add(mbr, -1, -1);
- }
- }
- }
-
- private void clearDigest() {
- synchronized(digest) {
- digest.clear();
- }
- }
-
- /**
- * Updates own digest (this.digest) with latest digest from NAKACK, received as with GET_DIGEST_STABLE_OK
- * @param latest_digest
- */
- private void updateFromOwnDigest(Digest latest_digest) {
- if(latest_digest == null || latest_digest.size() == 0)
- return;
-
- Map.Entry entry;
- org.jgroups.protocols.pbcast.Digest.Entry val;
- Address mbr;
-
- synchronized(digest) {
- for(Iterator it=latest_digest.senders.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- mbr=(Address)entry.getKey();
- val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
- if(!digest.contains(mbr)) {
- digest.add(mbr, val.low_seqno, val.high_seqno, val.high_seqno_seen);
- }
- else {
- digest.set(mbr, val.low_seqno, val.high_seqno, val.high_seqno_seen);
- }
- }
- }
- }
-
- /** Update my own digest from a digest received by somebody else. Returns the number of elements updated
- * Needs to be called with a lock on digest */
- private int updateFromOtherDigest(Digest d) {
- if(d == null || d.size() == 0)
- return 0;
-
- Address mbr;
- long highest_seqno, my_highest_seqno;
- long highest_seen_seqno, my_highest_seen_seqno;
- Map.Entry entry;
- org.jgroups.protocols.pbcast.Digest.Entry val;
- int num_elements_updated=0;
- for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- mbr=(Address)entry.getKey();
- if(!digest.contains(mbr)) {
- if(log.isTraceEnabled()) log.trace("sender " + mbr + " not found in my digest");
- continue;
- }
- val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
- highest_seqno=val.high_seqno;
- highest_seen_seqno=val.high_seqno_seen;
-
- // compute the minimum of the highest seqnos deliverable (for garbage collection)
- my_highest_seqno=digest.highSeqnoAt(mbr);
- digest.setHighSeqnoAt(mbr, Math.min(my_highest_seqno, highest_seqno));
-
- // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
- my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
- digest.setHighSeqnoSeenAt(mbr, Math.max(my_highest_seen_seqno, highest_seen_seqno));
-
- num_elements_updated++;
- }
- return num_elements_updated;
- }
-
-
- private void resetHeardFromList(Vector new_members) {
- if(new_members == null || new_members.size() == 0)
- return;
- synchronized(heard_from) {
- heard_from.clear();
- heard_from.addAll(new_members);
- heard_from.remove(local_addr); // I don't need to hear from myself
- }
- }
-
- private void removeFromHeardFromList(Address mbr) {
- heard_from.remove(mbr); // Vector is already synchronized for a single operation
- }
-
-
- void startStableTask() {
- num_gossip_runs=max_gossip_runs;
-
- // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
- // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
- // 1 cycle: on the next message or view, we will start the task
- if(stable_task != null)
- return;
- synchronized(stable_task_mutex) {
- if(stable_task != null && stable_task.running()) {
- return; // already running
- }
- stable_task=new StableTask();
- timer.add(stable_task, true); // fixed-rate scheduling
- }
- if(log.isTraceEnabled())
- log.trace("stable task started; num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs);
- }
-
-
- void stopStableTask() {
- // contrary to startStableTask(), we don't need double-checked locking here because this method is not
- // called frequently
- synchronized(stable_task_mutex) {
- if(stable_task != null) {
- stable_task.stop();
- stable_task=null;
- }
- }
- }
-
-
- void startResumeTask(long max_suspend_time) {
- max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
-
- synchronized(resume_task_mutex) {
- if(resume_task != null && resume_task.running()) {
- return; // already running
- }
- else {
- resume_task=new ResumeTask(max_suspend_time);
- timer.add(resume_task, true); // fixed-rate scheduling
- }
- }
- if(log.isDebugEnabled())
- log.debug("resume task started, max_suspend_time=" + max_suspend_time);
- }
-
-
- void stopResumeTask() {
- synchronized(resume_task_mutex) {
- if(resume_task != null) {
- resume_task.stop();
- resume_task=null;
- }
- }
- }
-
-
- void startStabilityTask(Digest d, long delay) {
- synchronized(stability_mutex) {
- if(stability_task != null && stability_task.running()) {
- return; // already running
- }
- else {
- stability_task=new StabilitySendTask(d, delay);
- timer.add(stability_task, true); // fixed-rate scheduling
- }
- }
- }
-
-
- void stopStabilityTask() {
- synchronized(stability_mutex) {
- if(stability_task != null) {
- stability_task.stop();
- stability_task=null;
- }
- }
- }
-
-
- /**
- Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
- <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
- seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
- message, which results in garbage collection of messages lower than the ones in the stability vector. The
- maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
- for details).
- */
- private void handleStableGossip(Address sender, Digest d) {
- if(d == null || sender == null) {
- if(log.isErrorEnabled()) log.error("digest or sender is null");
- return;
- }
-
- if(suspended) {
- if(log.isTraceEnabled()) {
- log.trace("STABLE message will not be handled as I'm suspended");
- }
- return;
- }
-
- if(local_addr.equals(sender))
- return; // don't need to update myself from myself
-
- if(log.isTraceEnabled())
- log.trace(new StringBuffer("received digest from ").append(sender).append(": ").append(d));
- if(!heard_from.contains(sender)) { // already received gossip from sender; discard it
- if(log.isTraceEnabled()) log.trace("already received gossip from " + sender);
- return;
- }
-
- // we won't handle the gossip d, if d's members don't match the membership in my own digest,
- // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
-// if(!this.digest.sameSenders(d)) {
-// if(log.isTraceEnabled()) {
-// log.trace("received digest from " + sender + " (digest=" + d + ") which does not match my own digest ("+
-// this.digest + "): ignoring digest and re-initializing own digest");
-// }
-// resetHeardFromList(mbrs);
-// return;
-// }
-
- Digest copy;
- synchronized(digest) {
- int num_elements_expected=digest.size(), num_elements_updated=updateFromOtherDigest(d);
- // we can only remove the sender from heard_from if *all* elements of my digest were updated
- if(num_elements_updated != num_elements_expected)
- return;
- copy=digest.copy();
- }
-
- removeFromHeardFromList(sender);
- if(heard_from.size() == 0) {
- if(log.isTraceEnabled()) log.trace("sending stability msg " + copy);
- sendStabilityMessage(copy);
- resetHeardFromList(mbrs);
- }
- }
-
-
- /**
- * Bcasts a STABLE message of the current digest to all members. Message contains highest seqnos of all members
- * seen by this member. Highest seqnos are retrieved from the NAKACK layer below.
- */
- private void sendStableMessage() {
- if(suspended) {
- if(log.isTraceEnabled())
- log.trace("will not send STABLE message as I'm suspended");
- return;
- }
-
- Digest d=null;
- Message msg=new Message(); // mcast message
- StableHeader hdr;
- d=this.digest.copy();
- if(d != null && d.size() > 0) {
- if(log.isTraceEnabled())
- log.trace(new StringBuffer("mcasting STABLE msg, digest=").append(d).append(" (num_gossip_runs=").
- append(num_gossip_runs).append(", max_gossip_runs=").append(max_gossip_runs).append(')'));
- hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
- msg.putHeader(name, hdr);
- num_gossips++;
- passDown(new Event(Event.MSG, msg));
- }
- }
-
-
-
- /**
- Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
- The reason for waiting a random amount of time is that, in the worst case, all members receive a
- STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
- STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
- elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
- waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
- discard S2.
- @param tmp A copy of te stability digest, so we don't need to copy it again
- */
- void sendStabilityMessage(Digest tmp) {
- long delay;
-
- if(timer == null) {
- if(log.isErrorEnabled())
- log.error("timer is null, cannot schedule stability message to be sent");
- timer=stack != null ? stack.timer : null;
- return;
- }
-
- // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
- // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
- // STABILITY msg at the same time
- delay=Util.random(stability_delay);
- startStabilityTask(tmp, delay);
- }
-
-
- void handleStabilityMessage(Digest d) {
- if(d == null) {
- if(log.isErrorEnabled()) log.error("stability vector is null");
- return;
- }
-
- if(suspended) {
- if(log.isDebugEnabled()) {
- log.debug("STABILITY message will not be handled as suspended=" + suspended);
- }
- return;
- }
-
- if(log.isDebugEnabled()) log.debug("stability vector is " + d.printHighSeqnos());
- stopStabilityTask();
-
- // we won't handle the gossip d, if d's members don't match the membership in my own digest,
- // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
- if(!this.digest.sameSenders(d)) {
- if(log.isDebugEnabled()) {
- log.debug("received digest (digest=" + d + ") which does not match my own digest ("+
- this.digest + "): ignoring digest and re-initializing own digest");
- }
- resetHeardFromList(mbrs);
- return;
- }
-
- // pass STABLE event down the stack, so NAKACK can garbage collect old messages
- passDown(new Event(Event.STABLE, d));
- }
-
-
-
- /* ------------------------------------End of Private Methods ------------------------------------- */
-
-
-
-
-
-
-
- public static class StableHeader extends Header implements Streamable {
- public static final int STABLE_GOSSIP=1;
- public static final int STABILITY=2;
-
- int type=0;
- // Digest digest=new Digest(); // used for both STABLE_GOSSIP and STABILITY message
- Digest stableDigest=null; // changed by Bela April 4 2004
-
- public StableHeader() {
- } // used for externalizable
-
-
- public StableHeader(int type, Digest digest) {
- this.type=type;
- this.stableDigest=digest;
- }
-
-
- static String type2String(int t) {
- switch(t) {
- case STABLE_GOSSIP:
- return "STABLE_GOSSIP";
- case STABILITY:
- return "STABILITY";
- default:
- return "<unknown>";
- }
- }
-
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append('[');
- sb.append(type2String(type));
- sb.append("]: digest is ");
- sb.append(stableDigest);
- return sb.toString();
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
- if(stableDigest == null) {
- out.writeBoolean(false);
- return;
- }
- out.writeBoolean(true);
- stableDigest.writeExternal(out);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readInt();
- boolean digest_not_null=in.readBoolean();
- if(digest_not_null) {
- stableDigest=new Digest();
- stableDigest.readExternal(in);
- }
- }
-
- public long size() {
- long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
- if(stableDigest != null)
- retval+=stableDigest.serializedSize();
- return retval;
- }
-
- public void writeTo(DataOutputStream out) throws IOException {
- out.writeInt(type);
- Util.writeStreamable(stableDigest, out);
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- type=in.readInt();
- stableDigest=(Digest)Util.readStreamable(Digest.class, in);
- }
-
-
- }
-
-
-
-
- /**
- Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
- However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
- stable_send task terminates only after a period of time within which no messages were either sent
- or received
- */
- private class StableTask implements TimeScheduler.Task {
- boolean stopped=false;
-
- public void stop() {
- stopped=true;
- }
-
- public boolean running() { // syntactic sugar
- return !stopped;
- }
-
- public boolean cancelled() {
- return stopped;
- }
-
- public long nextInterval() {
- long interval=computeSleepTime();
- if(interval <= 0)
- return 10000;
- else
- return interval;
- }
-
-
- public void run() {
- if(suspended) {
- if(log.isTraceEnabled())
- log.trace("stable task will not run as suspended=" + suspended);
- stopStableTask();
- return;
- }
-
- // asks the NAKACK protocol for the current digest, reply event is GET_DIGEST_STABLE_OK (arg=digest)
- passDown(new Event(Event.GET_DIGEST_STABLE));
-
- num_gossip_runs--;
- if(num_gossip_runs <= 0) {
- if(log.isTraceEnabled())
- log.trace("stable task terminating (num_gossip_runs=" +
- num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
- stopStableTask();
- }
- }
-
- long computeSleepTime() {
- return getRandom((mbrs.size() * desired_avg_gossip * 2));
- }
-
- long getRandom(long range) {
- return (long)((Math.random() * range) % range);
- }
- }
-
-
-
-
-
- /**
- * Multicasts a STABILITY message.
- */
- private class StabilitySendTask implements TimeScheduler.Task {
- Digest d=null;
- boolean stopped=false;
- long delay=2000;
-
-
- public StabilitySendTask(Digest d, long delay) {
- this.d=d;
- this.delay=delay;
- }
-
- public boolean running() {
- return !stopped;
- }
-
- public void stop() {
- stopped=true;
- }
-
- public boolean cancelled() {
- return stopped;
- }
-
-
- /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
- public long nextInterval() {
- return delay;
- }
-
-
- public void run() {
- Message msg;
- StableHeader hdr;
-
- if(suspended) {
- if(log.isDebugEnabled()) {
- log.debug("STABILITY message will not be sent as suspended=" + suspended);
- }
- stopped=true;
- return;
- }
-
- if(d != null && !stopped) {
- msg=new Message();
- hdr=new StableHeader(StableHeader.STABILITY, d);
- msg.putHeader(STABLE.name, hdr);
- passDown(new Event(Event.MSG, msg));
- d=null;
- }
- stopped=true; // run only once
- }
- }
-
-
- private class ResumeTask implements TimeScheduler.Task {
- boolean running=true;
- long max_suspend_time=0;
-
- ResumeTask(long max_suspend_time) {
- this.max_suspend_time=max_suspend_time;
- }
-
- void stop() {
- running=false;
- }
-
- public boolean running() {
- return running;
- }
-
- public boolean cancelled() {
- return running == false;
- }
-
- public long nextInterval() {
- return max_suspend_time;
- }
-
- public void run() {
- if(suspended)
- log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
- "check why this event was not received (or increase max_suspend_time for large state transfers)");
- resume();
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old
deleted file mode 100644
index f8a559e..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STABLE.java.old
+++ /dev/null
@@ -1,855 +0,0 @@
-// $Id: STABLE.java,v 1.27 2005/07/13 19:57:59 belaban Exp $
-
-package org.jgroups.protocols.pbcast;
-
-
-import org.jgroups.*;
-import org.jgroups.stack.Protocol;
-import org.jgroups.util.Promise;
-import org.jgroups.util.TimeScheduler;
-import org.jgroups.util.Util;
-import org.jgroups.util.Streamable;
-
-import java.io.*;
-import java.util.Properties;
-import java.util.Vector;
-import java.util.Iterator;
-import java.util.Map;
-
-
-
-
-/**
- * Computes the broadcast messages that are stable, i.e. have been received by all members. Sends
- * STABLE events up the stack when this is the case. This allows NAKACK to garbage collect messages that
- * have been seen by all members.<p>
- * Works as follows: periodically we mcast our highest seqnos (seen for each member) to the group.
- * A stability vector, which maintains the highest seqno for each member and initially contains no data,
- * is updated when such a message is received. The entry for a member P is computed set to
- * min(entry[P], digest[P]). When messages from all members have been received, a stability
- * message is mcast, which causes all members to send a STABLE event up the stack (triggering garbage collection
- * in the NAKACK layer).<p>
- * The stable task now terminates after max_num_gossips if no messages or view changes have been sent or received
- * in the meantime. It will resume when messages are received. This effectively suspends sending superfluous
- * STABLE messages in the face of no activity.<br/>
- * New: when <code>max_bytes</code> is exceeded (unless disabled by setting it to 0),
- * a STABLE task will be started (unless it is already running).
- * @author Bela Ban
- */
-public class STABLE extends Protocol {
- Address local_addr=null;
- final Vector mbrs=new Vector();
- final Digest digest=new Digest(10); // keeps track of the highest seqnos from all members
- final Promise digest_promise=new Promise(); // for fetching digest (from NAKACK layer)
- final Vector heard_from=new Vector(); // keeps track of who we already heard from (STABLE_GOSSIP msgs)
- long digest_timeout=60000; // time to wait until digest is received (from NAKACK)
-
- /** Sends a STABLE gossip every 20 seconds on average. 0 disables gossipping of STABLE messages */
- long desired_avg_gossip=20000;
-
- /** delay before we send STABILITY msg (give others a change to send first). This should be set to a very
- * small number (> 0 !) if <code>max_bytes</code> is used */
- long stability_delay=6000;
- StabilitySendTask stability_task=null;
- final Object stability_mutex=new Object(); // to synchronize on stability_task
- StableTask stable_task=null; // bcasts periodic STABLE message (added to timer below)
- final Object stable_task_mutex=new Object(); // to sync on stable_task
- TimeScheduler timer=null; // to send periodic STABLE msgs (and STABILITY messages)
- int max_gossip_runs=3; // max. number of times the StableTask runs before terminating
- int num_gossip_runs=max_gossip_runs; // this number is decremented (max_gossip_runs doesn't change)
- static final String name="STABLE";
-
- /** Total amount of bytes from incoming messages (default = 0 = disabled). When exceeded, a STABLE
- * message will be broadcast and <code>num_bytes_received</code> reset to 0 . If this is > 0, then ideally
- * <code>stability_delay</code> should be set to a low number as well */
- long max_bytes=0;
-
- /** The total number of bytes received from unicast and multicast messages */
- long num_bytes_received=0;
-
- /** When true, don't take part in garbage collection protocol: neither send STABLE messages nor
- * handle STABILITY messages */
- boolean suspended=false;
-
- /** Max time we should hold off on message garbage collection. This is a second line of defense in case
- * we get a SUSPEND_STABLE, but forget to send a corresponding RESUME_STABLE (which should never happen !)
- * The consequence of a missing RESUME_STABLE would be that the group doesn't garbage collect stable
- * messages anymore, eventually, with a lot of traffic, every member would accumulate messages and run
- * out of memory !
- */
- // long max_suspend_time=600000;
-
- ResumeTask resume_task=null;
- final Object resume_task_mutex=new Object();
-
- /** Number of gossip messages */
- int num_gossips=0;
-
-
- public String getName() {
- return name;
- }
-
- public long getDesiredAverageGossip() {
- return desired_avg_gossip;
- }
-
- public void setDesiredAverageGossip(long gossip_interval) {
- desired_avg_gossip=gossip_interval;
- }
-
- public long getMaxBytes() {
- return max_bytes;
- }
-
- public void setMaxBytes(long max_bytes) {
- this.max_bytes=max_bytes;
- }
-
- public int getNumberOfGossipMessages() {return num_gossips;}
-
- public void resetStats() {
- super.resetStats();
- num_gossips=0;
- }
-
-
- public Vector requiredDownServices() {
- Vector retval=new Vector();
- retval.addElement(new Integer(Event.GET_DIGEST_STABLE)); // NAKACK layer
- return retval;
- }
-
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("digest_timeout");
- if(str != null) {
- digest_timeout=Long.parseLong(str);
- props.remove("digest_timeout");
- }
-
- str=props.getProperty("desired_avg_gossip");
- if(str != null) {
- desired_avg_gossip=Long.parseLong(str);
- props.remove("desired_avg_gossip");
- }
-
- str=props.getProperty("stability_delay");
- if(str != null) {
- stability_delay=Long.parseLong(str);
- props.remove("stability_delay");
- }
-
- str=props.getProperty("max_gossip_runs");
- if(str != null) {
- max_gossip_runs=Integer.parseInt(str);
- num_gossip_runs=max_gossip_runs;
- props.remove("max_gossip_runs");
- }
-
- str=props.getProperty("max_bytes");
- if(str != null) {
- max_bytes=Long.parseLong(str);
- props.remove("max_bytes");
- }
-
- str=props.getProperty("max_suspend_time");
- if(str != null) {
- log.error("max_suspend_time is not supported any longer; please remove it (ignoring it)");
- props.remove("max_suspend_time");
- }
-
- if(props.size() > 0) {
- log.error("STABLE.setProperties(): these properties are not recognized: " + props);
-
- return false;
- }
- return true;
- }
-
-
- void suspend(long timeout) {
- if(!suspended) {
- suspended=true;
- if(log.isDebugEnabled())
- log.debug("suspending message garbage collection");
- }
- startResumeTask(timeout); // will not start task if already running
- }
-
- void resume() {
- suspended=false;
- if(log.isDebugEnabled())
- log.debug("resuming message garbage collection");
- stopResumeTask();
- }
-
- public void start() throws Exception {
- if(stack != null && stack.timer != null)
- timer=stack.timer;
- else
- throw new Exception("STABLE.up(): timer cannot be retrieved from protocol stack");
- }
-
- public void stop() {
- stopStableTask();
- }
-
-
- public void up(Event evt) {
- Message msg;
- StableHeader hdr;
- Header obj;
- int type=evt.getType();
-
- switch(type) {
-
- case Event.MSG:
- msg=(Message)evt.getArg();
-
- if(max_bytes > 0) { // message counting is enabled
- long size=Math.max(msg.getLength(), 24);
- num_bytes_received+=size;
- if(num_bytes_received >= max_bytes) {
- if(log.isTraceEnabled()) {
- StringBuffer sb=new StringBuffer("max_bytes has been exceeded (max_bytes=");
- sb.append(max_bytes).append(", number of bytes received=");
- sb.append(num_bytes_received).append("): sending STABLE message");
- log.trace(sb.toString());
- }
-
- new Thread() {
- public void run() {
- initialize();
- sendStableMessage();
- }
- }.start();
- num_bytes_received=0;
- }
- }
-
- obj=msg.getHeader(name);
- if(obj == null || !(obj instanceof StableHeader))
- break;
- hdr=(StableHeader)msg.removeHeader(name);
- switch(hdr.type) {
- case StableHeader.STABLE_GOSSIP:
- handleStableGossip(msg.getSrc(), hdr.stableDigest);
- break;
- case StableHeader.STABILITY:
- handleStabilityMessage(hdr.stableDigest);
- break;
- default:
- if(log.isErrorEnabled()) log.error("StableHeader type " + hdr.type + " not known");
- }
- return; // don't pass STABLE or STABILITY messages up the stack
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
- }
-
- passUp(evt);
- if(desired_avg_gossip > 0) {
- if(type == Event.VIEW_CHANGE || type == Event.MSG)
- startStableTask(); // only starts task if not yet running
- }
- }
-
-
- /**
- * We need to receive this event out-of-band, otherwise we would block. The use case is
- * <ol>
- * <li>To send a STABLE_GOSSIP message we need the digest (from NAKACK below)
- * <li>We send a GET_DIGEST_STABLE event down <em>from the up() method</em>
- * <li>NAKACK sends the GET_DIGEST_STABLE_OK backup. <em>However, we may have other messages in the
- * up queue ahead of this event !</em> Therefore the event cannot be processed until all messages ahead of
- * the event have been processed. These can't be processed, however, because the up() call waits for
- * GET_DIGEST_STABLE_OK ! The up() call would always run into the timeout.<be/>
- * Having out-of-band reception of just this one event eliminates the problem.
- * </ol>
- * @param evt
- */
- protected void receiveUpEvent(Event evt) {
- if(evt.getType() == Event.GET_DIGEST_STABLE_OK) {
- digest_promise.setResult(evt.getArg());
- return;
- }
- super.receiveUpEvent(evt);
- }
-
-
- public void down(Event evt) {
- int type=evt.getType();
-
- switch(evt.getType()) {
- case Event.VIEW_CHANGE:
- View v=(View)evt.getArg();
- Vector tmp=v.getMembers();
- mbrs.removeAllElements();
- mbrs.addAll(tmp);
- heard_from.retainAll(tmp); // removes all elements from heard_from that are not in new view
- stopStableTask();
- break;
-
- case Event.SUSPEND_STABLE:
- long timeout=0;
- Object t=evt.getArg();
- if(t != null && t instanceof Long)
- timeout=((Long)t).longValue();
- stopStableTask();
- suspend(timeout);
- break;
-
- case Event.RESUME_STABLE:
- resume();
- break;
- }
-
- if(desired_avg_gossip > 0) {
- if(type == Event.VIEW_CHANGE || type == Event.MSG)
- startStableTask(); // only starts task if not yet running
- }
-
- passDown(evt);
- }
-
-
- public void runMessageGarbageCollection() {
- sendStableMessage();
- }
-
-
-
- /* --------------------------------------- Private Methods ---------------------------------------- */
-
- void initialize() {
- synchronized(digest) {
- digest.clear();
- for(int i=0; i < mbrs.size(); i++) {
- digest.add((Address)mbrs.elementAt(i), -1, -1);
- // digest.add((Address)mbrs.elementAt(i), 0, 0);
- }
- heard_from.removeAllElements();
- heard_from.addAll(mbrs);
- }
- }
-
-
- void startStableTask() {
- num_gossip_runs=max_gossip_runs;
-
- // Here, double-checked locking works: we don't want to synchronize if the task already runs (which is the case
- // 99% of the time). If stable_task gets nulled after the condition check, we return anyways, but just miss
- // 1 cycle: on the next message or view, we will start the task
- if(stable_task != null)
- return;
- synchronized(stable_task_mutex) {
- if(stable_task != null && stable_task.running()) {
- return; // already running
- }
- stable_task=new StableTask();
- timer.add(stable_task, true); // fixed-rate scheduling
- }
- if(log.isTraceEnabled())
- log.trace("stable task started; num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs);
- }
-
-
- void stopStableTask() {
- // contrary to startStableTask(), we don't need double-checked locking here because this method is not
- // called frequently
- synchronized(stable_task_mutex) {
- if(stable_task != null) {
- stable_task.stop();
- stable_task=null;
- }
- }
- }
-
-
- void startResumeTask(long max_suspend_time) {
- max_suspend_time=(long)(max_suspend_time * 1.1); // little slack
-
- synchronized(resume_task_mutex) {
- if(resume_task != null && resume_task.running()) {
- return; // already running
- }
- else {
- resume_task=new ResumeTask(max_suspend_time);
- timer.add(resume_task, true); // fixed-rate scheduling
- }
- }
- if(log.isDebugEnabled())
- log.debug("resume task started, max_suspend_time=" + max_suspend_time);
- }
-
-
- void stopResumeTask() {
- synchronized(resume_task_mutex) {
- if(resume_task != null) {
- resume_task.stop();
- resume_task=null;
- }
- }
- }
-
-
- void startStabilityTask(Digest d, long delay) {
- synchronized(stability_mutex) {
- if(stability_task != null && stability_task.running()) {
- return; // already running
- }
- else {
- stability_task=new StabilitySendTask(d, delay);
- timer.add(stability_task, true); // fixed-rate scheduling
- }
- }
- }
-
-
- void stopStabilityTask() {
- synchronized(stability_mutex) {
- if(stability_task != null) {
- stability_task.stop();
- stability_task=null;
- }
- }
- }
-
-
- /**
- Digest d contains (a) the highest seqnos <em>deliverable</em> for each sender and (b) the highest seqnos
- <em>seen</em> for each member. (Difference: with 1,2,4,5, the highest seqno seen is 5, whereas the highest
- seqno deliverable is 2). The minimum of all highest seqnos deliverable will be taken to send a stability
- message, which results in garbage collection of messages lower than the ones in the stability vector. The
- maximum of all seqnos will be taken to trigger possible retransmission of last missing seqno (see DESIGN
- for details).
- */
- private void handleStableGossip(Address sender, Digest d) {
- Address mbr;
- long highest_seqno, my_highest_seqno;
- long highest_seen_seqno, my_highest_seen_seqno;
- boolean my_own_gossip=false;
-
- if(d == null || sender == null) {
- if(log.isErrorEnabled()) log.error("digest or sender is null");
- return;
- }
-
- if(suspended) {
- if(log.isTraceEnabled()) {
- log.trace("STABLE message will not be handled as suspended=" + suspended);
- }
- return;
- }
-
- if(log.isTraceEnabled())
- log.trace(new StringBuffer("received digest from ").append(sender).append(": ").append(d));
- if(!heard_from.contains(sender)) { // already received gossip from sender; discard it
- if(log.isTraceEnabled()) log.trace("already received gossip from " + sender);
- return;
- }
-
- // we won't handle the gossip d, if d's members don't match the membership in my own digest,
- // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
- if(!this.digest.sameSenders(d)) {
- if(log.isTraceEnabled()) {
- log.trace("received digest from " + sender + " (digest=" + d + ") which does not match my own digest ("+
- this.digest + "): ignoring digest and re-initializing own digest");
- }
- initialize();
- return;
- }
-
- my_own_gossip=local_addr != null && local_addr.equals(sender);
-
- Map.Entry entry;
- org.jgroups.protocols.pbcast.Digest.Entry val;
- for(Iterator it=d.senders.entrySet().iterator(); it.hasNext();) {
- entry=(Map.Entry)it.next();
- mbr=(Address)entry.getKey();
- if(!digest.contains(mbr)) {
- if(log.isTraceEnabled()) log.trace("sender " + mbr + " not found in stability vector");
- continue;
- }
- val=(org.jgroups.protocols.pbcast.Digest.Entry)entry.getValue();
- highest_seqno=val.high_seqno;
- highest_seen_seqno=val.high_seqno_seen;
-
- // compute the minimum of the highest seqnos deliverable (for garbage collection)
- my_highest_seqno=digest.highSeqnoAt(mbr);
- if(my_highest_seqno < 0) {
- if(highest_seqno >= 0 && my_own_gossip)
- digest.setHighSeqnoAt(mbr, highest_seqno);
- }
- else {
- digest.setHighSeqnoAt(mbr, Math.min(my_highest_seqno, highest_seqno));
- }
-
- // compute the maximum of the highest seqnos seen (for retransmission of last missing message)
- my_highest_seen_seqno=digest.highSeqnoSeenAt(mbr);
- if(my_highest_seen_seqno < 0) {
- if(highest_seen_seqno >= 0)
- digest.setHighSeqnoSeenAt(mbr, highest_seen_seqno);
- }
- else {
- digest.setHighSeqnoSeenAt(mbr, Math.max(my_highest_seen_seqno, highest_seen_seqno));
- }
- }
-
- heard_from.removeElement(sender);
- if(heard_from.size() == 0) {
- if(log.isTraceEnabled()) log.trace("sending stability msg " + digest);
- sendStabilityMessage(digest.copy());
- initialize();
- }
- }
-
-
- /**
- * Bcasts a STABLE message to all group members. Message contains highest seqnos of all members
- * seen by this member. Highest seqnos are retrieved from the NAKACK layer above.
- */
- void sendStableMessage() {
- Digest d=null;
- Message msg=new Message(); // mcast message
- StableHeader hdr;
-
- if(suspended) {
- if(log.isTraceEnabled())
- log.trace("will not send STABLE message as suspended=" + suspended);
- return;
- }
-
- d=getDigest();
- if(d != null && d.size() > 0) {
- if(log.isTraceEnabled())
- log.trace("mcasting STABLE msg, digest=" + d +
- " (num_gossip_runs=" + num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
- hdr=new StableHeader(StableHeader.STABLE_GOSSIP, d);
- msg.putHeader(name, hdr);
- num_gossips++;
- passDown(new Event(Event.MSG, msg));
- }
- }
-
-
-
- Digest getDigest() {
- Digest ret=null;
- passDown(new Event(Event.GET_DIGEST_STABLE));
- ret=(Digest)digest_promise.getResult(digest_timeout);
- if(ret == null) {
- if(log.isErrorEnabled())
- log.error("digest could not be fetched from below " + "(timeout was " + digest_timeout + " msecs)");
- }
- return ret;
- }
-
-
- /**
- Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
- The reason for waiting a random amount of time is that, in the worst case, all members receive a
- STABLE_GOSSIP message from the last outstanding member at the same time and would therefore mcast the
- STABILITY message at the same time too. To avoid this, each member waits random N msecs. If, before N
- elapses, some other member sent the STABILITY message, we just cancel our own message. If, during
- waiting for N msecs to send STABILITY message S1, another STABILITY message S2 is to be sent, we just
- discard S2.
- @param tmp A copy of te stability digest, so we don't need to copy it again
- */
- void sendStabilityMessage(Digest tmp) {
- long delay;
-
- if(timer == null) {
- if(log.isErrorEnabled())
- log.error("timer is null, cannot schedule stability message to be sent");
- timer=stack != null ? stack.timer : null;
- return;
- }
-
- // give other members a chance to mcast STABILITY message. if we receive STABILITY by the end of
- // our random sleep, we will not send the STABILITY msg. this prevents that all mbrs mcast a
- // STABILITY msg at the same time
- delay=Util.random(stability_delay);
- startStabilityTask(tmp, delay);
- }
-
-
- void handleStabilityMessage(Digest d) {
- if(d == null) {
- if(log.isErrorEnabled()) log.error("stability vector is null");
- return;
- }
-
- if(suspended) {
- if(log.isDebugEnabled()) {
- log.debug("STABILITY message will not be handled as suspended=" + suspended);
- }
- return;
- }
-
- if(log.isDebugEnabled()) log.debug("stability vector is " + d.printHighSeqnos());
- stopStabilityTask();
-
- // we won't handle the gossip d, if d's members don't match the membership in my own digest,
- // this is part of the fix for the NAKACK problem (bugs #943480 and #938584)
- if(!this.digest.sameSenders(d)) {
- if(log.isDebugEnabled()) {
- log.debug("received digest (digest=" + d + ") which does not match my own digest ("+
- this.digest + "): ignoring digest and re-initializing own digest");
- }
- initialize();
- return;
- }
-
- // pass STABLE event down the stack, so NAKACK can garbage collect old messages
- passDown(new Event(Event.STABLE, d));
- }
-
-
-
- /* ------------------------------------End of Private Methods ------------------------------------- */
-
-
-
-
-
-
-
- public static class StableHeader extends Header implements Streamable {
- public static final int STABLE_GOSSIP=1;
- public static final int STABILITY=2;
-
- int type=0;
- // Digest digest=new Digest(); // used for both STABLE_GOSSIP and STABILITY message
- Digest stableDigest=null; // changed by Bela April 4 2004
-
- public StableHeader() {
- } // used for externalizable
-
-
- public StableHeader(int type, Digest digest) {
- this.type=type;
- this.stableDigest=digest;
- }
-
-
- static String type2String(int t) {
- switch(t) {
- case STABLE_GOSSIP:
- return "STABLE_GOSSIP";
- case STABILITY:
- return "STABILITY";
- default:
- return "<unknown>";
- }
- }
-
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append('[');
- sb.append(type2String(type));
- sb.append("]: digest is ");
- sb.append(stableDigest);
- return sb.toString();
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(type);
- if(stableDigest == null) {
- out.writeBoolean(false);
- return;
- }
- out.writeBoolean(true);
- stableDigest.writeExternal(out);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- type=in.readInt();
- boolean digest_not_null=in.readBoolean();
- if(digest_not_null) {
- stableDigest=new Digest();
- stableDigest.readExternal(in);
- }
- }
-
- public long size() {
- long retval=Global.INT_SIZE + Global.BYTE_SIZE; // type + presence for digest
- if(stableDigest != null)
- retval+=stableDigest.serializedSize();
- return retval;
- }
-
- public void writeTo(DataOutputStream out) throws IOException {
- out.writeInt(type);
- Util.writeStreamable(stableDigest, out);
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- type=in.readInt();
- stableDigest=(Digest)Util.readStreamable(Digest.class, in);
- }
-
-
- }
-
-
-
-
- /**
- Mcast periodic STABLE message. Interval between sends varies. Terminates after num_gossip_runs is 0.
- However, UP or DOWN messages will reset num_gossip_runs to max_gossip_runs. This has the effect that the
- stable_send task terminates only after a period of time within which no messages were either sent
- or received
- */
- private class StableTask implements TimeScheduler.Task {
- boolean stopped=false;
-
- public void stop() {
- stopped=true;
- }
-
- public boolean running() { // syntactic sugar
- return !stopped;
- }
-
- public boolean cancelled() {
- return stopped;
- }
-
- public long nextInterval() {
- long interval=computeSleepTime();
- if(interval <= 0)
- return 10000;
- else
- return interval;
- }
-
-
- public void run() {
- if(suspended) {
- if(log.isTraceEnabled())
- log.trace("stable task will not run as suspended=" + suspended);
- stopStableTask();
- return;
- }
- initialize();
- sendStableMessage();
- num_gossip_runs--;
- if(num_gossip_runs <= 0) {
- if(log.isTraceEnabled())
- log.trace("stable task terminating (num_gossip_runs=" +
- num_gossip_runs + ", max_gossip_runs=" + max_gossip_runs + ')');
- stopStableTask();
- }
- }
-
- long computeSleepTime() {
- return getRandom((mbrs.size() * desired_avg_gossip * 2));
- }
-
- long getRandom(long range) {
- return (long)((Math.random() * range) % range);
- }
- }
-
-
-
-
-
- /**
- * Multicasts a STABILITY message.
- */
- private class StabilitySendTask implements TimeScheduler.Task {
- Digest d=null;
- boolean stopped=false;
- long delay=2000;
-
-
- public StabilitySendTask(Digest d, long delay) {
- this.d=d;
- this.delay=delay;
- }
-
- public boolean running() {
- return !stopped;
- }
-
- public void stop() {
- stopped=true;
- }
-
- public boolean cancelled() {
- return stopped;
- }
-
-
- /** wait a random number of msecs (to give other a chance to send the STABILITY msg first) */
- public long nextInterval() {
- return delay;
- }
-
-
- public void run() {
- Message msg;
- StableHeader hdr;
-
- if(suspended) {
- if(log.isDebugEnabled()) {
- log.debug("STABILITY message will not be sent as suspended=" + suspended);
- }
- stopped=true;
- return;
- }
-
- if(d != null && !stopped) {
- msg=new Message();
- hdr=new StableHeader(StableHeader.STABILITY, d);
- msg.putHeader(STABLE.name, hdr);
- passDown(new Event(Event.MSG, msg));
- d=null;
- }
- stopped=true; // run only once
- }
- }
-
-
- private class ResumeTask implements TimeScheduler.Task {
- boolean running=true;
- long max_suspend_time=0;
-
- ResumeTask(long max_suspend_time) {
- this.max_suspend_time=max_suspend_time;
- }
-
- void stop() {
- running=false;
- }
-
- public boolean running() {
- return running;
- }
-
- public boolean cancelled() {
- return running == false;
- }
-
- public long nextInterval() {
- return max_suspend_time;
- }
-
- public void run() {
- if(suspended)
- log.warn("ResumeTask resumed message garbage collection - this should be done by a RESUME_STABLE event; " +
- "check why this event was not received (or increase max_suspend_time for large state transfers)");
- resume();
- }
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
deleted file mode 100644
index 1fb7667..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/STATE_TRANSFER.java
+++ /dev/null
@@ -1,461 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: STATE_TRANSFER.java,v 1.25 2005/12/16 16:21:07 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.pbcast;
-
-
-import com.gemstone.org.jgroups.*;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.stack.StateTransferInfo;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.List;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.Util;
-
-import java.io.*;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.Properties;
-import java.util.Vector;
-
-
-/**
- * New STATE_TRANSFER protocol based on PBCAST. Compared to the one in ./protocols, it doesn't
- * need a QUEUE layer above it. A state request is sent to a chosen member (coordinator if
- * null). That member makes a copy D of its current digest and asks the application for a copy of
- * its current state S. Then the member returns both S and D to the requester. The requester
- * first sets its digest to D and then returns the state to the application.
- * @author Bela Ban
- */
-public class STATE_TRANSFER extends Protocol {
- Address local_addr=null;
- final Vector members=new Vector();
- long state_id=1; // used to differentiate between state transfers (not currently used)
- final List state_requesters=new List(); // requesters of state (usually just 1, could be more)
- Digest digest=null;
- final HashMap map=new HashMap(); // to store configuration information
- long start, stop; // to measure state transfer time
- int num_state_reqs=0;
- long num_bytes_sent=0;
- double avg_state_size=0;
- final static String name="STATE_TRANSFER";
-
-
- /** All protocol names have to be unique ! */
- @Override // GemStoneAddition
- public String getName() {
- return name;
- }
-
- public int getNumberOfStateRequests() {return num_state_reqs;}
- public long getNumberOfStateBytesSent() {return num_bytes_sent;}
- public double getAverageStateSize() {return avg_state_size;}
-
- @Override // GemStoneAddition
- public Vector requiredDownServices() {
- Vector retval=new Vector();
- retval.addElement(Integer.valueOf(Event.GET_DIGEST_STATE));
- retval.addElement(Integer.valueOf(Event.SET_DIGEST));
- return retval;
- }
-
- @Override // GemStoneAddition
- public void resetStats() {
- super.resetStats();
- num_state_reqs=0;
- num_bytes_sent=0;
- avg_state_size=0;
- }
-
-
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- super.setProperties(props);
-
- if(props.size() > 0) {
- log.error(ExternalStrings.STATE_TRANSFER_STATE_TRANSFERSETPROPERTIES_THE_FOLLOWING_PROPERTIES_ARE_NOT_RECOGNIZED__0, props);
-
- return false;
- }
- return true;
- }
-
- @Override // GemStoneAddition
- public void init() throws Exception {
- map.put("state_transfer", Boolean.TRUE);
- map.put("protocol_class", getClass().getName());
- }
-
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- passUp(new Event(Event.CONFIG, map));
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- Message msg;
- StateHeader hdr;
-
- switch(evt.getType()) {
-
- case Event.BECOME_SERVER:
- break;
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- break;
-
- case Event.TMP_VIEW:
- case Event.VIEW_CHANGE:
- handleViewChange((View)evt.getArg());
- break;
-
- case Event.GET_DIGEST_STATE_OK:
- synchronized(state_requesters) {
- if(digest != null) {
- if(warn)
- log.warn("GET_DIGEST_STATE_OK: existing digest is not null, overwriting it !");
- }
- digest=(Digest)evt.getArg();
- if(log.isDebugEnabled())
- log.debug("GET_DIGEST_STATE_OK: digest is " + digest + "\npassUp(GET_APPLSTATE)");
- passUp(new Event(Event.GET_APPLSTATE));
- }
- return;
-
- case Event.MSG:
- msg=(Message)evt.getArg();
- if(!(msg.getHeader(name) instanceof StateHeader))
- break;
-
- hdr=(StateHeader)msg.removeHeader(name);
- switch(hdr.type) {
- case StateHeader.STATE_REQ:
- handleStateReq(hdr.sender);
- break;
- case StateHeader.STATE_RSP:
- handleStateRsp(hdr.sender, hdr.my_digest, msg.getBuffer());
- break;
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.STATE_TRANSFER_TYPE__0__NOT_KNOWN_IN_STATEHEADER, hdr.type);
- break;
- }
- return;
- }
- passUp(evt);
- }
-
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- byte[] state;
- Address target, requester;
- StateTransferInfo info;
- StateHeader hdr;
- Message state_req, state_rsp;
-
- switch(evt.getType()) {
-
- case Event.TMP_VIEW:
- case Event.VIEW_CHANGE:
- handleViewChange((View)evt.getArg());
- break;
-
- // generated by JChannel.getState(). currently, getting the state from more than 1 mbr is not implemented
- case Event.GET_STATE:
- info=(StateTransferInfo)evt.getArg();
- if(info.type != StateTransferInfo.GET_FROM_SINGLE) {
- if(warn) log.warn("[GET_STATE] (info=" + info + "): getting the state from " +
- "all members is not currently supported by pbcast.STATE_TRANSFER, will use " +
- "coordinator to fetch state instead");
- }
- if(info.target == null) {
- target=determineCoordinator();
- }
- else {
- target=info.target;
- if(target.equals(local_addr)) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.STATE_TRANSFER_GET_STATE_CANNOT_FETCH_STATE_FROM_MYSELF_);
- target=null;
- }
- }
- if(target == null) {
- if(log.isDebugEnabled()) log.debug("GET_STATE: first member (no state)");
- passUp(new Event(Event.GET_STATE_OK, null));
- }
- else {
- state_req=new Message(target, null, null);
- state_req.putHeader(name, new StateHeader(StateHeader.STATE_REQ, local_addr, state_id++, null));
- if(log.isDebugEnabled()) log.debug("GET_STATE: asking " + target + " for state");
-
- // suspend sending and handling of mesage garbage collection gossip messages,
- // fixes bugs #943480 and #938584). Wake up when state has been received
- if(log.isDebugEnabled())
- log.debug("passing down a SUSPEND_STABLE event");
- passDown(new Event(Event.SUSPEND_STABLE, Long.valueOf(info.timeout)));
-
- start=System.currentTimeMillis();
- passDown(new Event(Event.MSG, state_req));
- }
- return; // don't pass down any further !
-
- case Event.GET_APPLSTATE_OK:
- state=(byte[])evt.getArg();
- synchronized(state_requesters) {
- if(state_requesters.size() == 0) {
- if(warn)
- log.warn("GET_APPLSTATE_OK: received application state, but there are no requesters !");
- return;
- }
- if(digest == null) { // GemStoneAddition: missing braces
- if(warn) log.warn("GET_APPLSTATE_OK: received application state, " +
- "but there is no digest !");
- }
- else
- digest=digest.copy();
- if(stats) {
- num_state_reqs++;
- if(state != null)
- num_bytes_sent+=state.length;
- avg_state_size=(double)/*GemStoneAddition*/num_bytes_sent / num_state_reqs;
- }
- for(Enumeration e=state_requesters.elements(); e.hasMoreElements();) {
- requester=(Address)e.nextElement();
- state_rsp=new Message(requester, null, state); // put the state into state_rsp.buffer
- hdr=new StateHeader(StateHeader.STATE_RSP, local_addr, 0, digest);
- state_rsp.putHeader(name, hdr);
- passDown(new Event(Event.MSG, state_rsp));
- }
- digest=null;
- state_requesters.removeAll();
- }
- return; // don't pass down any further !
- }
-
- passDown(evt); // pass on to the layer below us
- }
-
-
-
-
-
-
-
-
-
- /* --------------------------- Private Methods -------------------------------- */
-
-
- /** Return the first element of members which is not me. Otherwise return null. */
- private Address determineCoordinator() {
- Address ret=null;
- synchronized(members) {
- if(/*members != null && GemStoneADdition (cannot be null) */ members.size() > 1) {
- for(int i=0; i < members.size(); i++)
- if(!local_addr.equals(members.elementAt(i)))
- return (Address)members.elementAt(i);
- }
- }
- return ret;
- }
-
-
- private void handleViewChange(View v) {
- Vector new_members=v.getMembers();
- synchronized(members) {
- members.clear();
- members.addAll(new_members);
- }
- }
-
- /**
- * If a state transfer is in progress, we don't need to send a GET_APPLSTATE event to the application, but
- * instead we just add the sender to the requester list so it will receive the same state when done. If not,
- * we add the sender to the requester list and send a GET_APPLSTATE event up.
- */
- private void handleStateReq(Object sender) {
- if(sender == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.STATE_TRANSFER_SENDER_IS_NULL_);
- return;
- }
-
- synchronized(state_requesters) {
- if(state_requesters.size() > 0) { // state transfer is in progress, digest was requested
- state_requesters.add(sender);
- }
- else {
- state_requesters.add(sender);
- digest=null;
- if(log.isDebugEnabled()) log.debug("passing down GET_DIGEST_STATE");
- passDown(new Event(Event.GET_DIGEST_STATE));
- }
- }
- }
-
-
- /** Set the digest and the send the state up to the application */
- void handleStateRsp(Object sender, Digest digest, byte[] state) {
- if(digest == null) {
- if(warn)
- log.warn("digest received from " + sender + " is null, skipping setting digest !");
- }
- else
- passDown(new Event(Event.SET_DIGEST, digest)); // set the digest (e.g. in NAKACK)
- stop=System.currentTimeMillis();
-
- // resume sending and handling of mesage garbage collection gossip messages,
- // fixes bugs #943480 and #938584). Wakes up a previously suspended message garbage
- // collection protocol (e.g. STABLE)
- if(log.isDebugEnabled())
- log.debug("passing down a RESUME_STABLE event");
- passDown(new Event(Event.RESUME_STABLE));
-
- if(state == null) {
- if(warn)
- log.warn("state received from " + sender + " is null, will return null state to application");
- }
- else
- log.debug("received state, size=" + state.length + " bytes. Time=" + (stop-start) + " milliseconds");
- passUp(new Event(Event.GET_STATE_OK, state));
- }
-
-
- /* ------------------------ End of Private Methods ------------------------------ */
-
-
-
- /**
- * Wraps data for a state request/response. Note that for a state response the actual state will <em>not</em
- * be stored in the header itself, but in the message's buffer.
- *
- */
- public static class StateHeader extends Header implements Streamable {
- public static final byte STATE_REQ=1;
- public static final byte STATE_RSP=2;
-
-
- long id=0; // state transfer ID (to separate multiple state transfers at the same time)
- byte type=0;
- Address sender=null; // sender of state STATE_REQ or STATE_RSP
- Digest my_digest=null; // digest of sender (if type is STATE_RSP)
-
-
- public StateHeader() {
- } // for externalization
-
-
- public StateHeader(byte type, Address sender, long id, Digest digest) {
- this.type=type;
- this.sender=sender;
- this.id=id;
- this.my_digest=digest;
- }
-
- public int getType() {
- return type;
- }
-
- public Digest getDigest() {
- return my_digest;
- }
-
-
- @Override // GemStoneAddition
- public boolean equals(Object o) {
- StateHeader other;
-
- if(sender != null && o != null) {
- if(!(o instanceof StateHeader))
- return false;
- other=(StateHeader)o;
- return sender.equals(other.sender) && id == other.id;
- }
- return false;
- }
-
-
- @Override // GemStoneAddition
- public int hashCode() {
- if(sender != null)
- return sender.hashCode() + (int)id;
- else
- return (int)id;
- }
-
-
- @Override // GemStoneAddition
- public String toString() {
- StringBuffer sb=new StringBuffer();
- sb.append("[StateHeader: type=" + type2Str(type));
- if(sender != null) sb.append(", sender=" + sender + " id=#" + id);
- if(my_digest != null) sb.append(", digest=" + my_digest);
- return sb.toString();
- }
-
-
- static String type2Str(int t) {
- switch(t) {
- case STATE_REQ:
- return "STATE_REQ";
- case STATE_RSP:
- return "STATE_RSP";
- default:
- return "<unknown>";
- }
- }
-
-
- public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(sender);
- out.writeLong(id);
- out.writeByte(type);
- out.writeObject(my_digest);
- }
-
-
- public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- sender=(Address)in.readObject();
- id=in.readLong();
- type=in.readByte();
- my_digest=(Digest)in.readObject();
- }
-
-
-
- public void writeTo(DataOutputStream out) throws IOException {
- out.writeByte(type);
- out.writeLong(id);
- Util.writeAddress(sender, out);
- Util.writeStreamable(my_digest, out);
- }
-
- public void readFrom(DataInputStream in) throws IOException, IllegalAccessException, InstantiationException {
- type=in.readByte();
- id=in.readLong();
- sender=Util.readAddress(in);
- my_digest=(Digest)Util.readStreamable(Digest.class, in);
- }
-
- @Override // GemStoneAddition
- public long size(short version) {
- long retval=Global.LONG_SIZE + Global.BYTE_SIZE; // id and type
-
- retval+=Util.size(sender,version);
-
- retval+=Global.BYTE_SIZE; // presence byte for my_digest
- if(my_digest != null)
- retval+=my_digest.serializedSize(version);
-
- return retval;
- }
-
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html
deleted file mode 100644
index e3c9af5..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/package.html
+++ /dev/null
@@ -1,5 +0,0 @@
-<HTML>
- <BODY>
- Supports probabilistic broadcasts.
- </BODY>
-</HTML>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java
deleted file mode 100644
index 85e774b..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNode.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-//$Id: RingNode.java,v 1.2 2004/03/30 06:47:20 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.ring;
-
-import com.gemstone.org.jgroups.stack.IpAddress;
-
-import java.util.Vector;
-
-public interface RingNode
-{
- Object receiveToken(int timeout) throws TokenLostException;
-
- Object receiveToken() throws TokenLostException;
-
- void passToken(Object token) throws TokenLostException;
-
- IpAddress getTokenReceiverAddress();
-
- void reconfigure(Vector newMembers);
-
- void tokenArrived(Object token);
-}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java
deleted file mode 100644
index d955acf..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/ring/RingNodeFlowControl.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-//$Id: RingNodeFlowControl.java,v 1.4 2005/08/08 12:45:41 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.ring;
-
-import com.gemstone.org.jgroups.util.GemFireTracer;
-
-
-public class RingNodeFlowControl
-{
- final int initialWindow;
- final float windowReduceFactor;
- final int belowThresholdAdvanceAmount;
- final float aboveThresholdAdvanceAmount;
- private int memberCount;
- private int previousBacklog;
- private int backlog;
- protected final GemFireTracer log=GemFireTracer.getLog(this.getClass());
-
- public RingNodeFlowControl(int initialWindow,
- float windowReduceFactor,
- int belowThresholdAdvanceAmount,
- float aboveThresholdAdvanceAmount)
- {
- this.initialWindow = initialWindow;
- this.windowReduceFactor = windowReduceFactor;
- this.belowThresholdAdvanceAmount = belowThresholdAdvanceAmount;
- this.aboveThresholdAdvanceAmount = aboveThresholdAdvanceAmount;
- }
-
- public RingNodeFlowControl()
- {
- this(20, 0.7F, 3, 1.0F);
- }
-
- public void invalidate()
- {
- previousBacklog = backlog = 0;
- }
-
- public int getBacklog()
- {
- return backlog;
- }
-
- public void setBacklog(int backlog)
- {
- if(backlog <0)
- throw new IllegalArgumentException("backlog value has to be positive");
- this.backlog = backlog;
- }
-
- public int getBacklogDifference()
- {
- return backlog - previousBacklog;
- }
-
- public int getPreviousBacklog()
- {
- return previousBacklog;
- }
-
- public void setPreviousBacklog()
- {
- this.previousBacklog = backlog;
- }
-
- public void viewChanged(int memberCount)
- {
- this.memberCount = memberCount;
- }
-
- public int getAllowedToBroadcast(RingToken token)
- {
- int fairWindowShare = 0;
- int windowSize = token.getWindowSize();
- if (memberCount == 0) memberCount = 1;
- int maxMessages = (windowSize / memberCount);
- if (maxMessages < 1)
- maxMessages = 1;
-
- int backlogAverage = token.getBacklog() + backlog - previousBacklog;
- if (backlogAverage > 0)
- {
- fairWindowShare = windowSize * backlog / backlogAverage;
- }
- fairWindowShare = (fairWindowShare < 1)?1: fairWindowShare;
-
-
- int maxAllowed = windowSize - token.getLastRoundBroadcastCount();
- if (maxAllowed < 1)
- maxAllowed = 0;
-
-
- if(log.isInfoEnabled()) log.info("fairWindowShare=" + fairWindowShare + " maxMessages="
- + maxMessages + " maxAllowed=" + maxAllowed);
-
- return (fairWindowShare < maxAllowed)?Math.min(fairWindowShare, maxMessages):Math.min(maxAllowed, maxMessages);
- }
-
- public void updateWindow(RingToken token)
- {
- int threshold = token.getWindowThreshold();
- int window = token.getWindowSize();
- if (window < initialWindow)
- {
- window = initialWindow;
- }
-
- boolean congested =(token.getRetransmissionRequests().size() > 0);
-
- if (congested)
- {
- threshold = (int) (window * windowReduceFactor);
- window = initialWindow;
- }
- else
- {
- if (window < threshold)
- {
- window += belowThresholdAdvanceAmount;
- }
- else
- {
- window += aboveThresholdAdvanceAmount;
- }
- }
- token.setWindowSize(window);
- token.setWindowThreshold(threshold);
- }
-
-}