You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/08/21 23:22:33 UTC
[09/51] [partial] incubator-geode git commit: GEODE-77 removing the
old jgroups subproject
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8b2ea77d/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/GMS.java
----------------------------------------------------------------------
diff --git a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/GMS.java b/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/GMS.java
deleted file mode 100644
index 01dab23..0000000
--- a/gemfire-jgroups/src/main/java/com/gemstone/org/jgroups/protocols/pbcast/GMS.java
+++ /dev/null
@@ -1,2732 +0,0 @@
-/** Notice of modification as required by the LGPL
- * This file was modified by Gemstone Systems Inc. on
- * $Date$
- **/
-// $Id: GMS.java,v 1.49 2005/12/23 14:57:06 belaban Exp $
-
-package com.gemstone.org.jgroups.protocols.pbcast;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Hashtable;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import com.gemstone.org.jgroups.Address;
-import com.gemstone.org.jgroups.Event;
-import com.gemstone.org.jgroups.Global;
-import com.gemstone.org.jgroups.Header;
-import com.gemstone.org.jgroups.JGroupsVersion;
-import com.gemstone.org.jgroups.Membership;
-import com.gemstone.org.jgroups.MergeView;
-import com.gemstone.org.jgroups.Message;
-import com.gemstone.org.jgroups.ShunnedAddressException;
-import com.gemstone.org.jgroups.SuspectMember;
-import com.gemstone.org.jgroups.TimeoutException;
-import com.gemstone.org.jgroups.View;
-import com.gemstone.org.jgroups.ViewId;
-import com.gemstone.org.jgroups.protocols.FD_SOCK;
-import com.gemstone.org.jgroups.protocols.UNICAST;
-import com.gemstone.org.jgroups.stack.GossipServer;
-import com.gemstone.org.jgroups.stack.IpAddress;
-import com.gemstone.org.jgroups.stack.Protocol;
-import com.gemstone.org.jgroups.util.AckCollector;
-import com.gemstone.org.jgroups.util.BoundedList;
-import com.gemstone.org.jgroups.util.ExternalStrings;
-import com.gemstone.org.jgroups.util.GFStringIdImpl;
-import com.gemstone.org.jgroups.util.GemFireTracer;
-import com.gemstone.org.jgroups.util.Promise;
-import com.gemstone.org.jgroups.util.QueueClosedException;
-import com.gemstone.org.jgroups.util.Streamable;
-import com.gemstone.org.jgroups.util.TimeScheduler;
-import com.gemstone.org.jgroups.util.Util;
-
-/**
- * Group membership protocol. Handles joins/leaves/crashes (suspicions) and emits new views
- * accordingly. Use VIEW_ENFORCER on top of this layer to make sure new members don't receive
- * any messages until they are members.
- */
-public class GMS extends Protocol {
- protected/*GemStoneAddition*/ GmsImpl impl=null;
- Address local_addr=null;
- final Membership members=new Membership(); // real membership
- private final Membership tmp_members=new Membership(); // base for computing next view
- static final String FAILED_TO_ACK_MEMBER_VERIFY_THREAD = "Failed to ACK member verify Thread" ;
-
- /** Members joined but for which no view has been received yet */
- private final Vector joining=new Vector(7);
-
- /** Members excluded from group, but for which no view has been received yet */
- private final Vector leaving=new Vector(7);
-
- View view=null;
- ViewId view_id=null;
- private long ltime=0;
- long join_timeout=5000;
- long join_retry_timeout=2000;
- long leave_timeout=5000;
- private long digest_timeout=0; // time to wait for a digest (from PBCAST). should be fast
- long merge_timeout=10000; // time to wait for all MERGE_RSPS
- private final Object impl_mutex=new Object(); // synchronizes event entry into impl
- private final Object digest_mutex=new Object();
- private final Promise digest_promise=new Promise(); // holds result of GET_DIGEST event
- private final Hashtable impls=new Hashtable(3);
- private boolean shun=true;
- boolean merge_leader=false; // can I initiate a merge ?
- private boolean print_local_addr=true;
- boolean disable_initial_coord=false; // can the member become a coord on startup or not ?
- /** Setting this to false disables concurrent startups. This is only used by unit testing code
- * for testing merging. To everybody else: don't change it to false ! */
- boolean handle_concurrent_startup=true;
- static final String CLIENT="Client";
- static final String COORD="Coordinator";
- static final String PART="Participant";
- TimeScheduler timer=null;
- private volatile boolean joined; // GemStoneAddition - has this member joined?
- private volatile boolean disconnected; // GemStoneAddition - has this member disconnected?
- /**
- * GemStoneAddition - network partition detection uses a "leader"
- * process to decide which partition will survive.
- */
- volatile Address leader;
-
- /**
- * GemStoneAddition - are coordinators forced to be colocated with gossip servers?
- */
- boolean floatingCoordinatorDisabled;
-
- /**
- * GemStoneAddition - is this process split-brain-detection-enabled? If so,
- * it's eligible to be the leader process.
- */
- boolean splitBrainDetectionEnabled;
-
- /**
- * GemStoneAddition - indicates whether a network partition has already been detected.
- * This is used to prevent this member from installing a new view after a network partition.
- * See bug #39279
- */
- protected boolean networkPartitionDetected = false;
-
- /** Max number of old members to keep in history */
- protected int num_prev_mbrs=50;
-
- /** Keeps track of old members (up to num_prev_mbrs) */
- BoundedList prev_members=null;
-
- int num_views=0;
-
- /** Stores the last 20 views */
- BoundedList prev_views=new BoundedList(20);
-
-
- /** Class to process JOIN, LEAVE and MERGE requests */
- public final ViewHandler view_handler=new ViewHandler();
-
- /** To collect VIEW_ACKs from all members */
- final AckCollector ack_collector=new AckCollector();
-
- /** To collect PREPARE_FOR_VIEW acks from members */
- final AckCollector prepare_collector = new AckCollector(); // GemStoneAddition
-
- /** Time in ms to wait for all VIEW acks (0 == wait forever) */
- long view_ack_collection_timeout=12437; // GemStoneAddition - was 20000;
- // for 6.5 release, changed from 17437 to 12437 - well below the default join timeout period
-
- /** How long should a Resumer wait until resuming the ViewHandler */
- long resume_task_timeout=17439; // GemStoneAddition - was 20000;
-
- private int partitionThreshold = 51; // GemStoneAddition
-
- private int memberWeight; // GemStoneAddition - see network partition detection spec
- private volatile View preparedView; // GemStoneAddition - see network partition detection spec
- private View previousPreparedView; // GemStoneAddition
- private volatile Address coordinator;
-
- public static final String name="GMS";
-
- /** GemStoneAddition - if set this causes view casting to be delayed by some number of seconds */
- public static int TEST_HOOK_SLOW_VIEW_CASTING;
-
- /**
- * GemStoneAddition - amount of time to wait for additional join/leave
- * requests before processing. Set gemfire.VIEW_BUNDLING_WAIT_TIME to
- * the number of milliseconds. Defaults to 150ms.
- */
- static final long BUNDLE_WAITTIME = Integer.getInteger("gemfire.VIEW_BUNDLING_WAIT_TIME", 150).intValue();
-
- private Object installViewLock = new Object(); //lock to assure that install views will atomically update the view, transition coordinators and notify locators
-
- public GMS() {
- initState();
- }
-
-
- @Override // GemStoneAddition
- public String getName() {
- return name;
- }
-
- // start GemStoneAddition
- @Override // GemStoneAddition
- public int getProtocolEnum() {
- return com.gemstone.org.jgroups.stack.Protocol.enumGMS;
- }
- // end GemStone addition
-
-
-
- public String getView() {return view_id != null? view_id.toString() : "null";}
- public int getNumberOfViews() {return num_views;}
- public String getLocalAddress() {return local_addr != null? local_addr.toString() : "null";}
- public String getMembers() {return members != null? members.toString() : "[]";}
- public int getNumMembers() {return members != null? members.size() : 0;}
- public long getJoinTimeout() {return join_timeout;}
- public void setJoinTimeout(long t) {join_timeout=t;}
- public long getJoinRetryTimeout() {return join_retry_timeout;}
- public void setJoinRetryTimeout(long t) {join_retry_timeout=t;}
- public boolean isShun() {return shun;}
- public void setShun(boolean s) {shun=s;}
- public String printPreviousMembers() {
- StringBuffer sb=new StringBuffer();
- if(prev_members != null) {
- for(Enumeration en=prev_members.elements(); en.hasMoreElements();) {
- sb.append(en.nextElement()).append("\n");
- }
- }
- return sb.toString();
- }
-
- public int viewHandlerSize() {return view_handler.size();}
- public boolean isViewHandlerSuspended() {return view_handler.suspended();}
- public String dumpViewHandlerQueue() {
- return view_handler.dumpQueue();
- }
- public String dumpViewHandlerHistory() {
- return view_handler.dumpHistory();
- }
- public void suspendViewHandler() {
- view_handler.suspend(null);
- }
- public void resumeViewHandler() {
- view_handler.resumeForce();
- }
-
- GemFireTracer getLog() {return log;}
-
- public String printPreviousViews() {
- StringBuffer sb=new StringBuffer();
- for(Enumeration en=prev_views.elements(); en.hasMoreElements();) {
- sb.append(en.nextElement()).append("\n");
- }
- return sb.toString();
- }
-
- public boolean isCoordinator() {
- Address coord=determineCoordinator();
- return coord != null && local_addr != null && local_addr.equals(coord);
- }
-
- /**
- * For testing we sometimes need to be able to disable disconnecting during
- * failure scenarios
- */
- public void disableDisconnectOnQuorumLossForTesting() {
- this.splitBrainDetectionEnabled = false;
- }
-
-
- @Override // GemStoneAddition
- public void resetStats() {
- super.resetStats();
- num_views=0;
- prev_views.removeAll();
- }
-
-
- @Override // GemStoneAddition
- public Vector requiredDownServices() {
- Vector retval=new Vector(3);
- retval.addElement(Integer.valueOf(Event.GET_DIGEST));
- retval.addElement(Integer.valueOf(Event.SET_DIGEST));
- retval.addElement(Integer.valueOf(Event.FIND_INITIAL_MBRS));
- return retval;
- }
-
-
- public void setImpl(GmsImpl new_impl) {
- synchronized(impl_mutex) {
- if(impl == new_impl) // superfluous
- return;
- impl=new_impl;
-// if (impl instanceof CoordGmsImpl) {
-// log.getLogWriter().info(
-// JGroupsStrings.GMS_THIS_MEMBER_IS_BECOMING_GROUP_COORDINATOR);
-// }
- if(log.isDebugEnabled()) {
- String msg=(local_addr != null? local_addr.toString()+" " : "") + "changed role to " + new_impl.getClass().getName();
- log.debug(msg);
- }
- }
- }
-
-
- public GmsImpl getImpl() {
- return impl;
- }
-
-
- @Override // GemStoneAddition
- public void init() throws Exception {
- prev_members=new BoundedList(num_prev_mbrs);
- timer=stack != null? stack.timer : null;
- if(timer == null)
- throw new Exception("GMS.init(): timer is null");
- if(impl != null)
- impl.init();
- }
-
- @Override // GemStoneAddition
- public void start() throws Exception {
- this.disconnected = false;
- if(impl != null) impl.start();
- }
-
- @Override // GemStoneAddition
- public void stop() {
- view_handler.stop(true);
- if(impl != null) impl.stop();
- if(prev_members != null)
- prev_members.removeAll();
- }
-
-
- // GemStoneAddition - added suspects argument for bug 41772
- public synchronized void becomeCoordinator(Vector suspects) {
- if (!(impl instanceof CoordGmsImpl)) { // GemStoneAddition, synchronize and checked for redundant becomeCoordinator
- log.getLogWriter().info(ExternalStrings.GMS_THIS_MEMBER_0_IS_BECOMING_GROUP_COORDINATOR, this.local_addr/*, new Exception("stack trace")*/);
- CoordGmsImpl tmp=(CoordGmsImpl)impls.get(COORD);
- if(tmp == null) {
- tmp=new CoordGmsImpl(this);
- impls.put(COORD, tmp);
- }
- try {
- tmp.init();
- }
- catch(Exception e) {
- log.error(ExternalStrings.GMS_EXCEPTION_SWITCHING_TO_COORDINATOR_ROLE, e);
- }
- if (((IpAddress)this.local_addr).getBirthViewId() < 0) {
- ((IpAddress)this.local_addr).setBirthViewId(
- this.view_id == null? 0 : this.view_id.getId());
- }
- setImpl(tmp);
- if (suspects != null && suspects.size() > 0) {
- List suspectList = new LinkedList(suspects);
- impl.handleLeave(suspectList, true,
- Collections.singletonList("Member was suspected of being dead prior to " + this.local_addr
- + " becoming group coordinator"), false);
- }
- }
- }
-
- // GemStoneAddition - logical time can become confused when a
- // coordinator sends out a view using unicast and the view isn't
- // sent all members before the coordinator dies. We increment the
- // Lamport clock to avoid missing updates
- protected synchronized void incrementLtime(int amount) {
- ltime += amount;
- }
-
-
- public void becomeParticipant() {
- if (this.stack.getChannel().closing()) { // GemStoneAddition - fix for bug #42969
- return; // don't try to become a participant if we're shutting down
- }
- ParticipantGmsImpl tmp=(ParticipantGmsImpl)impls.get(PART);
-
- if(tmp == null) {
- tmp=new ParticipantGmsImpl(this);
- impls.put(PART, tmp);
- }
- try {
- tmp.init();
- }
- catch(Exception e) {
- log.error(ExternalStrings.GMS_EXCEPTION_SWITCHING_TO_PARTICIPANT, e);
- }
- setImpl(tmp);
- }
-
- public void becomeClient() {
- ClientGmsImpl tmp=(ClientGmsImpl)impls.get(CLIENT);
- if(tmp == null) {
- tmp=new ClientGmsImpl(this);
- impls.put(CLIENT, tmp);
- }
- try {
- tmp.init();
- }
- catch(Exception e) {
- log.error(ExternalStrings.GMS_EXCEPTION_SWITCHING_TO_CLIENT_ROLE, e);
- }
- setImpl(tmp);
- }
-
-
- boolean haveCoordinatorRole() {
- return impl != null && impl instanceof CoordGmsImpl;
- }
-
- boolean haveParticipantRole() {
- return impl != null && impl instanceof ParticipantGmsImpl;
- }
-
-
- /**
- * Computes the next view. Returns a copy that has <code>old_mbrs</code> and
- * <code>suspected_mbrs</code> removed and <code>new_mbrs</code> added.
- */
- public View getNextView(Vector added_mbrs, Vector left_mbrs, Vector suspected_mbrs) {
- Vector mbrs;
- long vid;
- View v;
- Membership tmp_mbrs;
- Address tmp_mbr;
-
- synchronized(members) {
- if(view_id == null) {
-// return null; // this should *never* happen ! GemStoneAddition
-// log.error("view_id is null", new Exception()); // GemStoneAddition debug
- vid = 0; // GemStoneAddition
- }
- else {
- vid=Math.max(view_id.getId(), ltime) + 1;
- }
- ltime=vid;
- tmp_mbrs=tmp_members.copy(); // always operate on the temporary membership
-
- if (suspected_mbrs != null) { // GemStoneAddition - if a mbr is shutting down, just remove it
- if (left_mbrs == null) {
- left_mbrs = new Vector();
- }
- for (Iterator<IpAddress> it = suspected_mbrs.iterator(); it.hasNext(); ) {
- IpAddress addr = it.next();
- if (this.stack.gfPeerFunctions.isShuttingDown(addr)) {
- it.remove();
- left_mbrs.add(addr);
- }
- }
- }
-
- tmp_mbrs.remove(suspected_mbrs);
- tmp_mbrs.remove(left_mbrs);
- tmp_mbrs.add(added_mbrs);
- mbrs=tmp_mbrs.getMembers();
-
- // putting the coordinator at the front caused problems with dlock
- // because the coordinator might think it is an elder, but the old
- // elder would not release its role, causing a deadlock. See #47562
-// if (mbrs.contains(local_addr) && !mbrs.get(0).equals(local_addr)) {
-// mbrs.remove(local_addr);
-// mbrs.add(0, this.local_addr);
-// }
-
- v=new View(local_addr, vid, mbrs, suspected_mbrs);
-
- // Update membership (see DESIGN for explanation):
- tmp_members.set(mbrs);
-
- // Update joining list (see DESIGN for explanation)
- if(added_mbrs != null) {
- for(int i=0; i < added_mbrs.size(); i++) {
- tmp_mbr=(Address)added_mbrs.elementAt(i);
- if(!joining.contains(tmp_mbr))
- joining.addElement(tmp_mbr);
- }
- }
-
- // Update leaving list (see DESIGN for explanations)
- if(left_mbrs != null) {
- for(Iterator it=left_mbrs.iterator(); it.hasNext();) {
- Address addr=(Address)it.next();
- if(!this.leaving.contains(addr))
- this.leaving.add(addr);
- }
- }
- if(suspected_mbrs != null) {
- for(Iterator it=suspected_mbrs.iterator(); it.hasNext();) {
- Address addr=(Address)it.next();
- if(!this.leaving.contains(addr))
- this.leaving.add(addr);
- }
- }
-
- if(log.isDebugEnabled()) log.debug("new view is " + v);
- return v;
- }
- }
-
-
- /**
- Compute a new view, given the current view, the new members and the suspected/left
- members. Then simply mcast the view to all members. This is different to the VS GMS protocol,
- in which we run a FLUSH protocol which tries to achive consensus on the set of messages mcast in
- the current view before proceeding to install the next view.
-
- The members for the new view are computed as follows:
- <pre>
- existing leaving suspected joining
-
- 1. new_view y n n y
- 2. tmp_view y y n y
- (view_dest)
- </pre>
-
- <ol>
- <li>
- The new view to be installed includes the existing members plus the joining ones and
- excludes the leaving and suspected members.
- <li>
- A temporary view is sent down the stack as an <em>event</em>. This allows the bottom layer
- (e.g. UDP or TCP) to determine the members to which to send a multicast message. Compared
- to the new view, leaving members are <em>included</em> since they have are waiting for a
- view in which they are not members any longer before they leave. So, if we did not set a
- temporary view, joining members would not receive the view (signalling that they have been
- joined successfully). The temporary view is essentially the current view plus the joining
- members (old members are still part of the current view).
- </ol>
- * @param mcast TODO
- */
- public void castViewChange(Vector new_mbrs, Vector left_mbrs, Vector suspected_mbrs, boolean mcast) {
- View new_view;
-
- // next view: current mbrs + new_mbrs - old_mbrs - suspected_mbrs
- new_view=getNextView(new_mbrs, left_mbrs, suspected_mbrs);
- castViewChange(new_view, null, true);
- }
-
-
- public void castViewChange(View new_view, Digest digest, boolean mcast) {
- //boolean mcast = stack.jgmm.getDistributionConfig().getMcastPort() > 0; // GemStoneAddition
- castViewChangeWithDest(new_view, digest, null, true);
- }
-
- // GemStoneAddition - send the view with unicast to avoid NAKACK rejection of non-member
- protected void ucastViewChange(View new_view) {
- castViewChangeWithDest(new_view, null, null, false);
- }
-
-
- /**
- * Broadcasts the new view and digest, and waits for acks from all members in the list given as argument.
- * If the list is null, we take the members who are part of new_view
- * <p>GemStoneAddition much of this method was rewritten for quorum-based
- * network partition detection
- * @param new_view
- * @param digest
- * @param newMbrs_p
- * @param mcast_p whether multicast may be used (GemStoneAddition)
- */
- public void castViewChangeWithDest(View new_view, Digest digest, java.util.List newMbrs_p, boolean mcast_p) {
- GmsHeader hdr;
- long start, stop;
- ViewId vid=new_view.getVid();
- int size=-1;
-
- if (this.disconnected) {
- // GemStoneAddition: during debugging #50633 ViewHandler was found to
- // be casting a new view after the stack had been disconnected. UNICAST
- // had already reset all of its connections so everyone ignored the
- // view, but that's not guaranteed to be the case.
- return;
- }
-
- if (TEST_HOOK_SLOW_VIEW_CASTING > 0) {
- try {
- log.getLogWriter().info(ExternalStrings.DEBUG,
- "Delaying view casting by " + TEST_HOOK_SLOW_VIEW_CASTING
- + " seconds for testing");
- Thread.sleep(TEST_HOOK_SLOW_VIEW_CASTING * 1000);
- } catch (InterruptedException e) {
- log.getLogWriter().fine("Test hook interrupted while sleeping");
- Thread.currentThread().interrupt();
- } finally {
- TEST_HOOK_SLOW_VIEW_CASTING = 0;
- }
- }
-
- // GemStoneAddition
- // we've already sent a tmp_view down the stack to inform comm procotols
- // about new members. This tmp_view notification will remove old members
- // as well so that address canonicalization will not pick up old addresses
- // if a new member reuses an addr:port of an old member
- passDown(new Event(Event.TMP_VIEW, new_view));
-
- // do not multicast the view unless we really have a multicast port. Otherwise
- // we are relying on weak messaging support in TP based on the last installed view
- // see bug #39429
- boolean mcast = mcast_p && stack.gfPeerFunctions.getMcastPort() > 0; // GemStoneAddition
-
- List newMbrs = newMbrs_p; // GemStoneAddition - don't update formal parameters
- if(newMbrs == null || newMbrs.size() == 0) {
- newMbrs=new LinkedList(new_view.getMembers());
- }
-
- Set suspects = new HashSet(new_view.getSuspectedMembers());
- for (Iterator it=suspects.iterator(); it.hasNext(); ) {
- IpAddress addr = (IpAddress)it.next();
- if (this.stack.gfPeerFunctions.isShuttingDown(addr)) { // GemStoneAddition bug #44342
- new_view.notSuspect(addr);
- }
- }
-
- log.getLogWriter().info(ExternalStrings.GMS_MEMBERSHIP_SENDING_NEW_VIEW_0_1_MBRS,
- new Object[] { new_view, Integer.valueOf(new_view.size())}/*, new Exception("STACK")*/);
-
- start=System.currentTimeMillis();
- hdr=new GmsHeader(GmsHeader.VIEW);
- new_view.setMessageDigest(digest); // GemStoneAddition - move digest to message body
-// hdr.my_digest=digest;
- try {
- // in 7.0.1 we switch to always performing 2-phased view installation and quorum checks
- if (!prepareView(new_view, mcast, newMbrs)) {
- return;
- }
-
- synchronized (ack_collector) { // GemStoneAddition bug34505
- ack_collector.reset(vid, newMbrs);
- size=ack_collector.size();
- }
-
- // GemStoneAddition - when PWNing an existing view, we have to unicast
- // because NAKACK will reject a mcast message from a non-member
- if (mcast) {
- Message view_change_msg=new Message(); // bcast to all members
- view_change_msg.isHighPriority = true;
- view_change_msg.putHeader(name, hdr);
- view_change_msg.setObject(new_view);
- passDown(new Event(Event.MSG, view_change_msg));
- }
- else {
- for (int i=0; i<newMbrs.size(); i++) {
- Message msg = new Message();
- msg.isHighPriority = true;
- msg.putHeader(name, hdr);
- msg.setObject(new_view);
- msg.setDest((Address)newMbrs.get(i));
- passDown(new Event(Event.MSG, msg));
- }
- }
- // also send the view to any suspect members to give them a
- // chance to exit. bypass use of UNICAST since it will refuse
- // to send a message to a non-member
- if (new_view.getSuspectedMembers() != null) {
- for (Iterator it=new_view.getSuspectedMembers().iterator(); it.hasNext(); ) {
- Message msg = new Message();
- msg.isHighPriority = true;
- msg.putHeader(name, hdr);
- msg.putHeader(UNICAST.BYPASS_UNICAST, hdr);
- msg.setDest((Address)it.next());
- passDown(new Event(Event.MSG, msg));
- }
- }
-// if (new_view.getSuspectedMembers().size() > 0 && log.getLogWriterI18n().fineEnabled()) {
-// log.getLogWriterI18n().warning(
-// JGroupsStrings.DEBUG, "bogus warning to test failure mode"
-// );
-// }
- try {
- ack_collector.waitForAllAcks(view_ack_collection_timeout);
- if(trace) {
- stop=System.currentTimeMillis();
- log.trace("received all ACKs (" + size + ") for " + vid + " in " + (stop-start) + "ms");
- }
- }
- catch(TimeoutException e) {
- // GemStoneAddition - bug35218
- String missingStr;
- String receivedStr;
- synchronized (ack_collector) {
- missingStr = ack_collector.getMissingAcks().toString();
- receivedStr = ack_collector.getReceived().toString();
- }
- // In 8.0 this is changed to not do suspect processing if the coordinator is shutting down
- if (!stack.getChannel().closing()) {
- log.getLogWriter().warning(
- ExternalStrings.GMS_FAILED_TO_COLLECT_ALL_ACKS_0_FOR_VIEW_1_AFTER_2_MS_MISSING_ACKS_FROM_3_RECEIVED_4_LOCAL_ADDR_5,
- new Object[] {Integer.valueOf(size), vid, Long.valueOf(view_ack_collection_timeout), missingStr/*, receivedStr, local_addr*/});
- /*
- * GemStoneAddition
- * suspect members that did not respond. This must be done in
- * a different thread to avoid lockups in FD_SOCK
- */
- /*new Thread() {
- @Override // GemStoneAddition
- public void run() {
- // bug #49448 fixed here by not synchronizing on ack-collector
- // while suspect verification processing is going on
- List suspects = ack_collector.getMissingAcks();
- suspect(suspects);
- }}.start();*/
- checkFailedToAckMembers(new_view, ack_collector.getMissingAcks());
- }
- }
- }
- finally {
- // GemStoneAddition - now we can fully reset the collector so that
- // it is ready to receive view-acks from join responses subsequent
- // to this view. Prior to this change, these acks were lost when
- // the view was reset in this method
- synchronized(ack_collector) {
- ack_collector.fullyReset();
- }
- synchronized(prepare_collector) {
- prepare_collector.fullyReset(); // GemStoneAddition - fix for #43886
- }
- // GemStoneAddition - we don't need this anymore
- this.previousPreparedView = null;
- }
- }
-
-
-
- /**
- * GemStoneAddition - send a prepare to all members and require a response
- *
- * @param new_view
- * @param mcast
- * @param newMbrs
- * @return true if the view can be transmitted
- */
- boolean prepareView(View new_view, boolean mcast, List newMbrs) {
-
- // compute the failed weight and form a set of non-admin failures
- Set<IpAddress> failures = new HashSet(new_view.getSuspectedMembers());
- int failedWeight = processFailuresAndGetWeight(this.view, this.leader, failures);
-
-// log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "[partition detection] these cache processes failed: " + failures);
-
- // this.preparedView may have been set before this vm decided to send out
- // a view. If it was, we use it to modify the current view and try again
- if (!processPreparedView(this.preparedView, new_view, newMbrs, false)) {
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("processPreparedView failed; aborting view " + new_view.getVid());
- }
- return false;
- }
-
-
- List<IpAddress> failedToAck = sendPrepareForViewChange(new_view, newMbrs, false);
-
- // this.preparedView may have been set by a PREPARE_VIEW_ACK response
- // if another member received a PREPARE_VIEW with a different view than
- // we just tried to prepare
- if (!processPreparedView(this.preparedView, new_view, newMbrs, false)) {
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("processPreparedView failed; aborting view " + new_view.getVid());
- }
- return false;
- }
-
- // [bruce] prior to 7.0.1 we did not perform suspect processing on members that fail to
- // respond to a new view. In 7.0.1 we require timely responses to view changes
- // in order to detect loss of quorum.
- // In 8.0 this is changed to not do suspect processing if the coordinator is shutting down
- if (failedToAck.size() > 0 && !stack.getChannel().closing()) {
- if ( !checkFailedToAckMembers(new_view, failedToAck) ) {
- return false;
- }
- }
- if (this.joined) { // bug #44491 - don't declare a network partition if we're still starting up
- // okay, others have acked that they're ready for the view, so we know
- // at this point who was around after loss of the members in the failures set
- int oldWeight = 0;
- // Map<IpAddress, Integer> oldWeights = new HashMap<IpAddress, Integer>();
- boolean leadProcessed = false;
- boolean displayWeights = (failedWeight > 0) && !Boolean.getBoolean("gemfire.hide-member-weights");
- StringBuffer sb = displayWeights? new StringBuffer(1000) : null;
- for (Iterator it = this.view.getMembers().iterator(); it.hasNext(); ) {
- IpAddress a = (IpAddress)it.next();
- int thisWeight = a.getMemberWeight();
- if (a.getVmKind() == 10 /* NORMAL_DM_KIND */) {
- thisWeight += 10;
- if (!leadProcessed) {
- thisWeight += 5;
- leadProcessed = true;
- }
- } else if (a.preferredForCoordinator()) {
- thisWeight += 3;
- }
- oldWeight += thisWeight;
- if (displayWeights) {
- sb.append("\n")
- .append(ExternalStrings.GMS_MEMBER_0_HAS_WEIGHT_1
- .toLocalizedString(a, Integer.valueOf(thisWeight)));
- }
- }
- if (sb != null) {
- String str = sb.toString();
- if (str.length() > 0) {
- log.getLogWriter().info(GFStringIdImpl.LITERAL, str);
- }
- }
- int lossThreshold = (int)Math.round((oldWeight * this.partitionThreshold) / 100.0);
-
- if (failedWeight > 0) {
- log.getLogWriter().info(ExternalStrings.NetworkPartitionDetectionWeightCalculation,
- new Object[] {Integer.valueOf(oldWeight), Integer.valueOf(failedWeight),
- Integer.valueOf(this.partitionThreshold),
- lossThreshold});
-
- if (failedWeight >= lossThreshold) {
- if (this.splitBrainDetectionEnabled) {
- this.networkPartitionDetected = true;
- sendNetworkPartitionWarning(new_view.getMembers());
- quorumLost(failures, this.view);
- forceDisconnect(new Event(Event.EXIT, stack.gfBasicFunctions.getForcedDisconnectException(
- ExternalStrings.GMS_EXITING_DUE_TO_POSSIBLE_NETWORK_PARTITION_EVENT_DUE_TO_LOSS_OF_MEMBER_0
- .toLocalizedString(failures.size(), failures))));
- return false;
- } // else quorumLost will be invoked when the view is installed
- }
- }
- }
-
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("done successfully preparing view " + new_view.getVid());
- }
- return true;
- }
-
- private boolean checkFailedToAckMembers(final View new_view, final List<IpAddress> failedToAck) {
-
- if (failedToAck.size() == 0) {
- return true;
- }
-
- if (log.getLogWriter().infoEnabled()) {
- log.getLogWriter().info(ExternalStrings.CHECKING_UNRESPONSIVE_MEMBERS,
- new Object[]{failedToAck});
- }
- final FD_SOCK fdSock = (FD_SOCK)this.stack.findProtocol("FD_SOCK");
- if (this.stack.getChannel().closing() && fdSock == null) {
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("FD_SOCK not found for liveness checks - aborting view " + new_view.getVid());
- }
- return false; // bug #44786: cannot prepare a view if the stack has been dismantled
- }
- assert fdSock != null;
- ExecutorService es = Executors.newFixedThreadPool(failedToAck.size(), new ThreadFactory() {
- private final AtomicInteger threadCount = new AtomicInteger(1);
- @Override
- public Thread newThread(Runnable r) {
- Thread th = new Thread(GemFireTracer.GROUP,
- r,
- FAILED_TO_ACK_MEMBER_VERIFY_THREAD + "-" + threadCount.getAndIncrement());
- return th;
- }
- });
- ArrayList<Callable<IpAddress>> al = new ArrayList<Callable<IpAddress>>();
- for (final Iterator<IpAddress> it=failedToAck.iterator(); it.hasNext(); ) {
- final IpAddress failedAddress = it.next();
- al.add(new Callable<IpAddress>() {
- IpAddress sockAddress = fdSock.fetchPingAddress(failedAddress, 0);
- public IpAddress call() throws Exception {
- log.getLogWriter().info(ExternalStrings.SUSPECTING_MEMBER_WHICH_DIDNT_ACK, new Object[]{failedAddress.toString()});
- if (sockAddress == null) {
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("unable to find ping address for " + failedAddress
- + " - using direct port to verify if it's there");
- }
- // fdSock can't perform the verification because it has no FD_SOCK
- // address for the member
- // If we can connect to the member's cache socket, then we know its machine is
- // up. It may be dead and its socket port reused, but we know there isn't a
- // network partition going on.
- sockAddress = new IpAddress(failedAddress.getIpAddress(), failedAddress.getDirectPort());
- if (sockAddress.getPort() != 0 &&
- fdSock.checkSuspect(failedAddress, sockAddress, ExternalStrings.MEMBER_DID_NOT_ACKNOWLEDGE_VIEW.toLocalizedString(), false, false)) {
- if (log.getLogWriter().infoEnabled()) {
- log.getLogWriter().info(ExternalStrings.ABLE_TO_CONNECT_TO_DC_PORT, new Object[]{failedAddress, sockAddress.getPort()});
- }
- return failedAddress;//now we remove below using feature
- }
- } else if (fdSock.checkSuspect(failedAddress, sockAddress, ExternalStrings.MEMBER_DID_NOT_ACKNOWLEDGE_VIEW.toLocalizedString(), true, false)) {
- if (log.getLogWriter().infoEnabled()) {
- log.getLogWriter().info(ExternalStrings.ABLE_TO_CONNECT_TO_FD_PORT, new Object[]{failedAddress, sockAddress.getPort()});
- }
- return failedAddress;//now we remove below using feature
- }
- return null;
- }
- }
- );
- }
- try {
- List<java.util.concurrent.Future<IpAddress>> futures = es.invokeAll(al);
- for(java.util.concurrent.Future<IpAddress> future : futures){
- try {
- IpAddress ipAddr = future.get(view_ack_collection_timeout + 100, TimeUnit.MILLISECONDS);
- if(ipAddr != null) {
- failedToAck.remove(ipAddr);
- }
- } catch (ExecutionException e) {
- } catch (java.util.concurrent.TimeoutException e) {
- }
- }
- } catch (InterruptedException e) {
- }
-
- try{
- es.shutdown();
- es.awaitTermination(view_ack_collection_timeout , TimeUnit.MILLISECONDS);
- }catch(Exception ex) {
- //ignore
- }
- // we could, at this point, also see if there are any ip addresses in the
- // set that ack'd the message that are also in the failed set. That would
- // tell us if the route to that NIC is still viable and we could remove the
- // entry from the failedToAck set [bruce 2011]
- if (!failedToAck.isEmpty()) {
- // log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "these cache processes failed to ack a view preparation message: " + failedToAck);
- // abandon this view and cast a new view with the failed ackers added to the suspects list
- failedToAck.addAll(new_view.getSuspectedMembers());
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("invoking handleLeave with " + failedToAck + ". My membership is " + this.members.getMembers());
- }
- this.impl.handleLeave(failedToAck, true,
- Collections.singletonList(ExternalStrings.MEMBER_DID_NOT_ACKNOWLEDGE_VIEW.toLocalizedString()),
- true);
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("done casting view " + new_view.getVid());
- }
- return false;
- }
- return true;
- }
-
- /**
- * process the given set of failures against the given last known view and report the
- * amount of lost weight. This also removes any admin members from the collection of failures.
- *
- * @param lastView the last View that was completely installed
- * @param leader the leader process in lastView
- * @param failures the set of failures in the view being created
- * @return the amount of weight lost in the failures collection
- */
- public static int processFailuresAndGetWeight(View lastView, Address leader, Collection failures) {
- int failedWeight = 0;
- for (Iterator<IpAddress> it=failures.iterator(); it.hasNext(); ) {
- IpAddress addr = it.next();
- if (!lastView.getMembers().contains(addr)) { // bug #37342: added and removed since the last installed view
- continue;
- }
- failedWeight += addr.getMemberWeight();
- if (addr.getVmKind() != 10 /* NORMAL_DM_KIND */) {
- if (addr.preferredForCoordinator()) {
- failedWeight += 3;
- }
- it.remove();
- } else {
- failedWeight += 10;
- }
- }
- if (leader != null && failures.contains(leader)) {
- failedWeight += 5;
- }
- return failedWeight;
- }
-
- // GemStoneAddition - notify of loss of quorum
- private void quorumLost(final Set failures, final View currentView) {
- Thread notificationThread = new Thread(GemFireTracer.GROUP, "Quorum Lost Notification") {
- public void run() {
- List remaining = new ArrayList(currentView.getMembers().size());
- remaining.addAll(currentView.getMembers());
- remaining.removeAll(failures);
- try {
- stack.gfPeerFunctions.quorumLost(failures, remaining);
- } catch (RuntimeException e) {
- if (e.getClass().getSimpleName().equals("CancelException")) {
- // bug #47403: ignore this exception - cache closed before notification went through
- } else {
- throw e;
- }
- }
- }
- };
- notificationThread.setDaemon(true);
- notificationThread.start();
- }
-
- /**
- * GemStoneAddition - process a view received in a PREPARE_VIEW message from another
- * coordinator, or from a PREPARE_VIEW_ACK response to a message
- * that this node transmitted to another member
- * @param pView the view being prepared
- * @param new_view the view I am trying to cast
- * @param newMbrs the members list in new_view
- * @param mcast whether the view can be multicast
- * @return true if new_view can be installed, false if a different view was installed
- */
- boolean processPreparedView(View pView, View new_view, List newMbrs, boolean mcast) {
- View prevView = this.previousPreparedView;
- if (pView != null // different view has been prepared
- && !pView.getCreator().equals(this.local_addr) // I did not create it
- && pView.getVid().compare(this.view.getVid()) > 0 // It's newer than my current view
- && (prevView == null || pView.getVid().compare(prevView.getVid()) > 0) ) { // I haven't processed a newer prepared view
-
- this.previousPreparedView = pView;
-
- Vector newMembersFromPView = new Vector();
- Vector newFailuresFromPView = new Vector();
- Vector newLeavingFromPView = new Vector();
- synchronized(members) {
- // someone else has already prepared a new view
- if (log.getLogWriter().infoEnabled()) {
- log.getLogWriter().info(ExternalStrings.RECEIVED_PREVIOUSLY_PREPARED_VIEW, pView);
- }
- Set<Address> allFailures = new HashSet(this.leaving);
- for (Iterator it=pView.getMembers().iterator(); it.hasNext(); ) {
- Address a = (Address)it.next();
- if (!allFailures.contains(a)) {
- // in the other view and not in my failure list - new member
- newMembersFromPView.add(a);
- }
- }
- for (Iterator it=pView.getSuspectedMembers().iterator(); it.hasNext(); ) {
- Address a = (Address)it.next();
- if (!allFailures.contains(a)) {
- // failed in other view but not in mine
- newFailuresFromPView.add(a);
- }
- }
- for (Iterator it=newMbrs.iterator(); it.hasNext(); ) {
- Address a = (Address)it.next();
- if (!pView.containsMember(a)) {
- newLeavingFromPView.add(a);
- }
- }
- }
- if (!newMembersFromPView.isEmpty()
- || !newFailuresFromPView.isEmpty()
- || !newLeavingFromPView.isEmpty()) {
- // we need the full list of failures but the others are just increments on
- // the leaving/joining collections in tmp_mbrs
- newFailuresFromPView.addAll(new_view.getSuspectedMembers());
- synchronized(prepare_collector) {
- prepare_collector.fullyReset(); // GemStoneAddition - fix for #43886
- }
- castViewChange(newMembersFromPView, newLeavingFromPView, newFailuresFromPView, mcast);
- return false;
- }
- }
- return true;
- }
-
- /**
- * GemStoneAddition - send prepare-for-view-change message and wait for
- * responses. Return a list of addresses that didn't respond
- */
- List<IpAddress> sendPrepareForViewChange(View newView, Collection<IpAddress> mbrs, boolean mcast) {
- int size=-1;
- ViewId vid = newView.getVid();
-
- synchronized (prepare_collector) { // GemStoneAddition bug34505
- prepare_collector.reset(vid, mbrs);
- size=prepare_collector.size();
- }
- boolean timedout=false;
- GmsHeader hdr=new GmsHeader(GmsHeader.PREPARE_FOR_VIEW);
- // GemStoneAddition - when PWNing an existing view, we have to unicast
- // because NAKACK will reject a mcast message from a non-member
- if (mcast) {
- Message view_change_msg=new Message(); // bcast to all members
- view_change_msg.setObject(newView);
- view_change_msg.isHighPriority = true;
- view_change_msg.putHeader(name, hdr);
- passDown(new Event(Event.MSG, view_change_msg));
- }
- else {
- for (IpAddress dest: mbrs) {
- if (dest.equals(this.local_addr)) {
- synchronized(prepare_collector) {
- prepare_collector.ack(dest, null);
- }
- } else {
- Message msg = new Message();
- msg.isHighPriority = true;
- msg.putHeader(name, hdr);
- msg.setObject(newView);
- msg.setDest(dest);
-// if (trace)
-// log.trace("sending PREPARE_FOR_VIEW to " + dest);
- passDown(new Event(Event.MSG, msg));
- }
- }
- }
- try {
-// log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "DEBUG: before waiting for prepare-acks, collector state is " + prepare_collector);
- prepare_collector.waitForAllAcks(view_ack_collection_timeout);
-// log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "DEBUG: after waiting for prepare-acks, collector state is " + prepare_collector);
- }
- catch(TimeoutException e) {
- // timeout processing is handled below, along with members declared dead during view preparation
- // see bug #47295
- timedout=true;
- }
- //log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "received acks from " + prepare_collector.getReceived());
- List missing;
- String missingStr;
- String receivedStr = null;
- boolean logMissing = false;
- synchronized (prepare_collector) {
- missing = prepare_collector.getMissingAcks();
- missing.addAll(prepare_collector.getSuspectedMembers());
- missing.removeAll(newView.getSuspectedMembers()); // don't reprocess members that are already kicked out
- missing.remove(this.local_addr); // don't suspect myself - I might be shutting down
- // bug #44342 - if the member has sent a shutdown message then it's okay
- for (Iterator<IpAddress> it=missing.iterator(); it.hasNext(); ) {
- IpAddress addr = it.next();
- if (!newView.containsMember(addr) || this.stack.gfPeerFunctions.isShuttingDown(addr)) {
- it.remove();
- }
- }
- logMissing = missing.size() > 0;
- if (logMissing) {
- // note who has acked while under synchronization
- receivedStr = prepare_collector.getReceived().toString();
- }
- }
- if (logMissing && !this.stack.getChannel().closing()) {
- missingStr = missing.toString();
- log.getLogWriter().warning(
- ExternalStrings.GMS_FAILED_TO_COLLECT_ALL_ACKS_0_FOR_VIEW_PREPARATION_1_AFTER_2_MS_MISSING_ACKS_FROM_3_RECEIVED_4_LOCAL_ADDR_5,
- new Object[] {Integer.valueOf(size), newView, Long.valueOf(view_ack_collection_timeout), missingStr, receivedStr, local_addr}/*, new Exception("stack trace")*/);
- }
- return missing;
- }
-
-
- void sendNetworkPartitionWarning(Collection<IpAddress> mbrs) {
- GmsHeader hdr = new GmsHeader(GmsHeader.NETWORK_PARTITION_DETECTED);
- for (IpAddress dest: mbrs) {
- Message msg = new Message();
- msg.isHighPriority = true;
- msg.putHeader(name, hdr);
- msg.setDest(dest);
- passDown(new Event(Event.MSG, msg));
- }
- try {
- Thread.sleep(this.leave_timeout);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt(); // pass the problem to other code
- }
- }
-
- /**
- * GemStoneAddition - suspect members that fail to return view acks
- */
- protected void suspect(List suspects) {
- passDown(new Event(Event.GMS_SUSPECT, suspects));
- }
-
-
- /**
- * GemStoneAddition - tell the ack collectors to stop waiting for the
- * given address
- */
- public void addSuspectToCollectors(Address addr) {
- synchronized (ack_collector) {
- ack_collector.suspect(addr);
- }
- synchronized(prepare_collector) {
- prepare_collector.suspect(addr);
- }
- }
- /**
- * Sets the new view and sends a VIEW_CHANGE event up and down the stack. If the view is a MergeView (subclass
- * of View), then digest will be non-null and has to be set before installing the view.
- */
- public void installView(View new_view, Digest digest) {
- if(digest != null)
- mergeDigest(digest);
- installView(new_view);
- }
-
-
- /** GemStoneAddition - return the weight of the given IDs */
- public static int getWeight(Collection ids, Address leader) {
- int weight = 0;
- for (Iterator<IpAddress> it=ids.iterator(); it.hasNext(); ) {
- IpAddress addr = it.next();
- int thisWeight = addr.getMemberWeight();
- if (addr.getVmKind() == 10 /* NORMAL_DM_KIND */) {
- thisWeight += 10;
- if (leader != null && addr.equals(leader)) {
- thisWeight += 5;
- }
- } else if (addr.preferredForCoordinator()) {
- thisWeight += 3;
- }
- weight += thisWeight;
- }
- return weight;
- }
-
- /**
- * Sets the new view and sends a VIEW_CHANGE event up and down the stack.
- */
- public void installView(View new_view) {
- Address coord;
- int rc;
- ViewId vid=new_view.getVid();
- Vector mbrs=new_view.getMembers();
-
- if (networkPartitionDetected) {
- return; // GemStoneAddition - do not install the view if we've decided to exit
- }
- this.preparedView = null;
-
- if(log.isDebugEnabled()) log.debug("[local_addr=" + local_addr + "] view is " + new_view);
-//log.getLogWriter().info("installing view " + new_view); // debugging
- if(stats) {
- num_views++;
- prev_views.add(new_view);
- }
-
- // Discards view with id lower than our own. Will be installed without check if first view
- if(view_id != null) {
- rc=vid.compareTo(view_id);
- if(rc <= 0) {
- if(log.isTraceEnabled() && rc < 0) // only scream if view is smaller, silently discard same views
- log.trace("[" + local_addr + "] received view < current view;" +
- " discarding it (current vid: " + view_id + ", new vid: " + vid + ')');
- return;
- }
- }
-
- ltime=Math.max(vid.getId(), ltime); // compute Lamport logical time
-
- /* Check for self-inclusion: if I'm not part of the new membership, I just discard it.
- This ensures that messages sent in view V1 are only received by members of V1 */
- if(checkSelfInclusion(mbrs) == false) {
- // only shun if this member was previously part of the group. avoids problem where multiple
- // members (e.g. X,Y,Z) join {A,B} concurrently, X is joined first, and Y and Z get view
- // {A,B,X}, which would cause Y and Z to be shunned as they are not part of the membership
- // bela Nov 20 2003
- if(shun && local_addr != null && prev_members.contains(local_addr)) {
- if(warn)
- log.warn("I (" + local_addr + ") am not a member of view " + new_view +
- ", shunning myself and leaving the group (prev_members are " + prev_members +
- ", current view is " + view + ")");
- if(impl != null)
- impl.handleExit();
- networkPartitionDetected = true;
- passUp(new Event(Event.EXIT,
- stack.gfBasicFunctions.getForcedDisconnectException(
- "This member has been forced out of the distributed system by " + new_view.getCreator()
- + ". Please consult GemFire logs to find the reason. (GMS shun)")));
- }
- else {
- if(warn) log.warn("I (" + local_addr + ") am not a member of view " + new_view + "; discarding view");
- }
- return;
- }
-
- Event view_event;
- synchronized(installViewLock) {
- synchronized(members) { // serialize access to views
-
- if (this.view != null) {
- int oldWeight = getWeight(this.view.getMembers(), this.leader);
- int failedWeight = getWeight(new_view.getSuspectedMembers(), this.leader);
- int lossThreshold = (int)Math.round((oldWeight * this.partitionThreshold) / 100.0);
- if (failedWeight >= lossThreshold) {
- log.getLogWriter().info(ExternalStrings.DEBUG, "old membership weight=" + oldWeight + ", loss threshold=" + lossThreshold+" and failed weight=" + failedWeight);
-// log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "current view="+this.view+"; suspects="+new_view.getSuspectedMembers());
- quorumLost(new_view.getSuspectedMembers(), this.view);
- }
- }
-
- // assign new_view to view_id
- if(new_view instanceof MergeView)
- view=new View(new_view.getVid(), new_view.getMembers());
- else
- view=new_view;
- view_id=vid.copy();
-
- // GemStoneAddition - tracking of leader
- Address oldLead = this.leader;
- this.leader = view.getLeadMember();
- if (this.leader != null) {
- // log acquisition of new leader
- if (oldLead == null || !oldLead.equals(this.leader)) {
- log.getLogWriter().info(ExternalStrings.GMS_MEMBERSHIP_LEADER_MEMBER_IS_NOW_0, this.leader);
- }
- }
-
-
- // Set the membership. Take into account joining members
- if(mbrs != null && mbrs.size() > 0) {
- members.set(mbrs);
- this.coordinator = members.getCoordinator(); // GemStoneAddition bug #44967
- tmp_members.set(members);
- joining.removeAll(mbrs); // remove all members in mbrs from joining
- // remove all elements from 'leaving' that are not in 'mbrs'
- leaving.retainAll(mbrs);
-
- tmp_members.add(joining); // add members that haven't yet shown up in the membership
- tmp_members.remove(leaving); // remove members that haven't yet been removed from the membership
-
- // add to prev_members
- for(Iterator it=mbrs.iterator(); it.hasNext();) {
- Address addr=(Address)it.next();
- if(!prev_members.contains(addr))
- prev_members.add(addr);
- }
- }
-
- view_event=new Event(Event.VIEW_CHANGE, new_view.clone());
- }
-
- coord=determineCoordinator();
- // if(coord != null && coord.equals(local_addr) && !(coord.equals(vid.getCoordAddress()))) {
- // changed on suggestion by yaronr and Nicolas Piedeloupe
- if(coord != null && coord.equals(local_addr) && !haveCoordinatorRole()) {
- // GemSToneAddition - pass suspects to coordinator impl
- Vector suspects;
- if (haveParticipantRole()) {
- suspects = ((ParticipantGmsImpl)impl).getSuspects();
- } else {
- suspects = new Vector();
- }
- becomeCoordinator(suspects);
- coord = local_addr; // GemStoneAddition
- }
- else {
- if(haveCoordinatorRole() && !local_addr.equals(coord))
- becomeParticipant();
- }
- // GemStoneAddition - notify concerned parties of the current coordinator
- if (coord != null) {
- notifyOfCoordinator(coord);
- }
- }
- // Send VIEW_CHANGE event up and down the stack:
- // (moved from inside synchronization for bug #52099)
- if (view_event != null) {
- passDown(view_event); // needed e.g. by failure detector or UDP
- passUp(view_event);
- }
- }
-
-
- void forceDisconnect(final Event exitEvent) {
- this.networkPartitionDetected = true;
- Thread tilt = new Thread(Thread.currentThread().getThreadGroup(),
- "GMS Network Partition Event") {
- @Override // GemStoneAddition
- public void run() {
- passDown(exitEvent);
- passUp(exitEvent);
- }
- };
- tilt.start();
- }
-
- /**
- * GemStoneAddition - rewritten for localizing view coordinator in locator
- * processes, and made public
- * @return the acting coordinator
- */
- public Address determineCoordinator() {
-// return members != null && members.size() > 0? (Address)members.elementAt(0) : null;
- return this.coordinator;
- }
-
- /**
- * GemStoneAddition - return the lead member for partition detection algorithms
- */
- public Address getLeadMember() {
- return this.leader;
- }
-
- /**
- * GemStoneAddition - retreive the partitionThreshold for quorum calculations
- */
- public int getPartitionThreshold() {
- return this.partitionThreshold;
- }
-
- /**
- * GemStoneAddition - retrieve the latest view
- */
- public View getLastView() {
- return this.view;
- }
-
- /** Checks whether the potential_new_coord would be the new coordinator (2nd in line) */
- protected boolean wouldBeNewCoordinator(Address potential_new_coord) {
-// Address new_coord;
-
- if(potential_new_coord == null) return false;
-
- synchronized(members) {
- if(members.size() < 2) return false;
-// new_coord=(Address)members.elementAt(1); // member at 2nd place
-// return new_coord != null && new_coord.equals(potential_new_coord);
- return members.wouldBeNewCoordinator(potential_new_coord);
- }
- }
-
-
- /** Returns true if local_addr is member of mbrs, else false */
- protected boolean checkSelfInclusion(Vector mbrs) {
- Object mbr;
- if(mbrs == null)
- return false;
- for(int i=0; i < mbrs.size(); i++) {
- mbr=mbrs.elementAt(i);
- if(mbr != null && local_addr.equals(mbr))
- return true;
- }
- return false;
- }
-
-
- // GemStoneAddition - this method is not used by jgroups
-// public View makeView(Vector mbrs) {
-// Address coord=null;
-// long id=0;
-//
-// if(view_id != null) {
-// coord=view_id.getCoordAddress();
-// id=view_id.getId();
-// }
-// return new View(coord, id, mbrs);
-// }
-
-
- // GemStoneAddition - this method is not used by jgroups
-// public View makeView(Vector mbrs, ViewId vid) {
-// Address coord=null;
-// long id=0;
-//
-// if(vid != null) {
-// coord=vid.getCoordAddress();
-// id=vid.getId();
-// }
-// return new View(coord, id, mbrs);
-// }
-
-
- /** Send down a SET_DIGEST event */
- public void setDigest(Digest d) {
- passDown(new Event(Event.SET_DIGEST, d));
- }
-
-
- /** Send down a MERGE_DIGEST event */
- public void mergeDigest(Digest d) {
- passDown(new Event(Event.MERGE_DIGEST, d));
- }
-
-
- /** Sends down a GET_DIGEST event and waits for the GET_DIGEST_OK response, or
- timeout, whichever occurs first */
- public Digest getDigest() {
- Digest ret=null;
-
- synchronized(digest_mutex) {
- digest_promise.reset();
- passDown(Event.GET_DIGEST_EVT);
- try {
- ret=(Digest)digest_promise.getResultWithTimeout(digest_timeout);
- }
- catch(TimeoutException e) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_DIGEST_COULD_NOT_BE_FETCHED_FROM_BELOW);
- }
- return ret;
- }
- }
-
-
- @Override // GemStoneAddition
- public void up(Event evt) {
- Object obj;
- Message msg;
- GmsHeader hdr;
- MergeData merge_data;
-
- switch(evt.getType()) {
-
- case Event.MSG:
- msg=(Message)evt.getArg();
- obj=msg.getHeader(name);
- if(obj == null || !(obj instanceof GmsHeader))
- break;
- hdr=(GmsHeader)msg.removeHeader(name);
- switch(hdr.type) {
- case GmsHeader.JOIN_REQ:
- if (this.haveCoordinatorRole()) {
- // GemStoneAddition - partial fix for bugs #41722 and #42009
- IpAddress iaddr = (IpAddress)hdr.mbr;
- if (iaddr.getBirthViewId() >= 0 && this.stack.gfPeerFunctions.isShunnedMemberNoSync(iaddr)) {
- log.getLogWriter().info(
- ExternalStrings. COORDGMSIMPL_REJECTING_0_DUE_TO_REUSED_IDENTITY, hdr.mbr);
- ((CoordGmsImpl)getImpl()).sendJoinResponse(new JoinRsp(JoinRsp.SHUNNED_ADDRESS), hdr.mbr, false);
- return;
- }
- if (members.contains(hdr.mbr)) {
- IpAddress hmbr = (IpAddress)hdr.mbr;
- for (Iterator it=members.getMembers().iterator(); it.hasNext(); ) {
- IpAddress addr = (IpAddress)it.next();
- if (addr.equals(hdr.mbr)) {
- if (addr.getUniqueID() != hmbr.getUniqueID()) {
- log.getLogWriter().info(
- ExternalStrings. COORDGMSIMPL_REJECTING_0_DUE_TO_REUSED_IDENTITY, hdr.mbr);
- ((CoordGmsImpl)getImpl()).sendJoinResponse(new JoinRsp(JoinRsp.SHUNNED_ADDRESS), hdr.mbr, false);
- return;
- } else {
- break;
- }
- }
- }
- getImpl().handleAlreadyJoined(hdr.mbr);
- return;
- }
- }
- // GemStoneAddition - bug #50510, 50742
- if (hdr.mbr.getVersionOrdinal() > 0
- && hdr.mbr.getVersionOrdinal() < JGroupsVersion.CURRENT_ORDINAL) {
- log.getLogWriter().warning(
- ExternalStrings.COORD_REJECTING_OLD_MEMBER_BECAUSE_UPGRADE_HAS_BEGUN,
- new Object[]{hdr.mbr});
- ((CoordGmsImpl)getImpl()).sendJoinResponse(
- new JoinRsp(ExternalStrings.COORD_REJECTING_OLD_MEMBER_BECAUSE_UPGRADE_HAS_BEGUN
- .toLocalizedString(hdr.mbr)), hdr.mbr, false);
- return;
- }
- view_handler.add(new Request(Request.JOIN, hdr.mbr, false, null));
- break;
- case GmsHeader.JOIN_RSP:
-// log.getLogWriterI18n().info(JGroupsStrings.DEBUG, "join response message received: " + hdr.join_rsp);
- impl.handleJoinResponse((JoinRsp)msg.getObject());
- break;
- case GmsHeader.LEAVE_REQ:
- if(log.isDebugEnabled())
- log.debug("received LEAVE_REQ for " + hdr.mbr + " from " + msg.getSrc());
- if(hdr.mbr == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_LEAVE_REQS_MBR_FIELD_IS_NULL);
- return;
- }
- synchronized(ack_collector) {
- ack_collector.ack(hdr.mbr, null); // GemStoneAddition - don't wait for acks from this member
- }
- synchronized(prepare_collector) {
- prepare_collector.ack(hdr.mbr, null); // GemStoneAddition - don't wait for acks from this member (bug #44342)
- }
- view_handler.add(new Request(Request.LEAVE, hdr.mbr, false, null));
- break;
- case GmsHeader.LEAVE_RSP:
- // GemStoneAddition
- if (hdr.arg != null) {
- // do some checking to prevent a message intended for another process
- // from shutting this one down
- if (hdr.mbr != null && !hdr.mbr.equals(this.local_addr)) {
- break;
- }
- }
- impl.handleLeaveResponse(hdr.arg);
- break;
- case GmsHeader.REMOVE_REQ: // GemStoneAddition - new REMOVE_REQ request for slow receivers
- // REMOVE_REQ differs from LEAVE_REQ in that the member is suspect
- // and will register as a failure in network partition detection
- // algorithms [bruce]
- if(hdr.mbr == null) {
- if(log.isErrorEnabled()) log.error("REMOVE_REQ's mbr field is null");
- return;
- }
- if (members.contains(msg.getSrc())) {
- if (msg.getSrc().equals(hdr.mbr) && log.isDebugEnabled()) {
- log.debug("received REMOVE_REQ for " + hdr.mbr + " from " + msg.getSrc() +
- (hdr.arg == null? "" : " Reason=" + hdr.arg));
- }
- else {
- log.getLogWriter().warning(
- ExternalStrings.GMS_MEMBERSHIP_RECEIVED_REQUEST_TO_REMOVE_0_FROM_1_2,
- new Object[] { hdr.mbr, msg.getSrc(), (hdr.arg == null? "" : " Reason=" + hdr.arg)});
-
- }
- view_handler.add(new Request(Request.LEAVE, hdr.mbr, true, null, hdr.arg));
- }
- break;
- case GmsHeader.VIEW:
- // send VIEW_ACK to sender of view
- Address coord=msg.getSrc();
- Message view_ack=new Message(coord, null, null);
- view_ack.isHighPriority = true;
- View v = msg.getObject();
- GmsHeader tmphdr=new GmsHeader(GmsHeader.VIEW_ACK);
- view_ack.putHeader(name, tmphdr);
- view_ack.setObject(v);
- if (this.local_addr.getBirthViewId() < 0) {
- // unicast can't handle changing view IDs very well
- view_ack.putHeader(UNICAST.BYPASS_UNICAST, tmphdr);
- }
- passDown(new Event(Event.MSG, view_ack));
- // GemStoneAddition - perform null check AFTER sending an ack
- // so we don't stall a coordinator when join_rsp hasn't been
- // received
- if(v == null) {
- if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_VIEW_VIEW__NULL);
- return;
- }
-
- // ViewId newId = hdr.view.getVid(); GemStoneAddition
- if (this.impl != null && !(this.impl instanceof ClientGmsImpl)
- && v.getVid().compareTo(view_id) > 0
- && !this.stack.getChannel().closing() /* GemStoneAddition - fix for bug #42969*/) {
- log.getLogWriter().info(ExternalStrings.GMS_MEMBERSHIP_RECEIVED_NEW_VIEW__0, v);
- impl.handleViewChange(v, v.getMessageDigest());
- }
- break;
-
- case GmsHeader.VIEW_ACK:
- Object sender=msg.getSrc();
- if (trace) // GemStoneAddition - debug 34750
- log.trace("Received VIEW_ACK from " + sender);
- v = msg.getObject(); // GemStoneAddition - ack the correct view
- ViewId vid = v==null? null : v.getVid();
- synchronized (ack_collector) { // GemStoneAddition bug34505
- ack_collector.ack(sender, vid);
- }
- return; // don't pass further up
-
-
- case GmsHeader.PREPARE_FOR_VIEW: // GemStoneAddition
- v = msg.getObject();
- if (trace)
- log.trace("Received PREPARE_FOR_VIEW from " + msg.getSrc()+" for view " + v);
- {
- GmsHeader responseHeader = new GmsHeader(GmsHeader.PREPARE_FOR_VIEW_ACK);
- View responseView = null;
- if (!msg.getSrc().equals(this.local_addr)) {
- if (this.preparedView != null && !v.getCreator().equals(this.preparedView.getCreator())) {
- // already have a prepared view - don't accept this one
- responseView = this.preparedView;
- this.preparedView = null;
- } else {
- this.preparedView = v;
- }
- }
- Message m = new Message(true);
- m.putHeader(name, responseHeader);
- m.setDest(msg.getSrc());
- m.setObject(responseView);
- passDown(new Event(Event.MSG, m));
- return;
- }
-
- case GmsHeader.PREPARE_FOR_VIEW_ACK: // GemStoneAddition - two phase views
- {
- sender=msg.getSrc();
- v = (View)msg.getObject();
- if (trace) {
- String preparedViewString = v == null? "" :
- " with conflicting view " + v;
- log.trace("Received PREPARE_FOR_VIEW_ACK from " + sender + preparedViewString);
- }
- vid = null;
- if (v != null) {
- // the sender had a conflicting view in preparation
- this.preparedView = v;
- vid = v.getVid();
- }
- synchronized (prepare_collector) {
- prepare_collector.ack(sender, vid);
- }
- return; // don't pass further up
- }
- case GmsHeader.NETWORK_PARTITION_DETECTED: // GemStoneAddition
- {
- forceDisconnect(new Event(Event.EXIT, stack.gfBasicFunctions.getForcedDisconnectException(ExternalStrings.COORDINATOR_DECLARED_NETWORK_PARTITION_EVENT.toLocalizedString(msg.getSrc()))));
- return;
- }
- // GemStoneAddition - explicit view request
- case GmsHeader.GET_VIEW:
- {
- View msgView = this.view;
- if (msgView != null) {
- Message viewRsp = new Message();
- JoinRsp vrsp = new JoinRsp(msgView, getDigest());
- GmsHeader ghdr = new GmsHeader(GmsHeader.GET_VIEW_RSP);
- viewRsp.putHeader(name, ghdr);
- viewRsp.setDest(msg.getSrc());
- viewRsp.setObject(vrsp);
- passDown(new Event(Event.MSG, viewRsp));
- log.getLogWriter().info(ExternalStrings.DEBUG, "Sending membership view to " + viewRsp.getDest() + ": " + msgView);
- }
- break;
- }
- // GemStoneAddition - response to view request
- case GmsHeader.GET_VIEW_RSP:
- {
- impl.handleGetViewResponse((JoinRsp)msg.getObject());
- break;
- }
- case GmsHeader.MERGE_REQ:
- impl.handleMergeRequest(msg.getSrc(), hdr.merge_id);
- break;
-
- case GmsHeader.MERGE_RSP:
- View theView = msg.getObject();
- merge_data=new MergeData(msg.getSrc(), theView, theView.getMessageDigest());
- merge_data.merge_rejected=hdr.merge_rejected;
- impl.handleMergeResponse(merge_data, hdr.merge_id);
- break;
-
- case GmsHeader.INSTALL_MERGE_VIEW:
- theView = msg.getObject();
- impl.handleMergeView(new MergeData(msg.getSrc(), theView, theView.getMessageDigest()), hdr.merge_id);
- break;
-
- case GmsHeader.CANCEL_MERGE:
- impl.handleMergeCancelled(hdr.merge_id);
- break;
-
- default:
- if(log.isErrorEnabled()) log.error(ExternalStrings.GMS_GMSHEADER_WITH_TYPE_0__NOT_KNOWN, hdr.type);
- }
- return; // don't pass up
-
- case Event.CONNECT_OK: // sent by someone else, but WE are responsible for sending this !
- case Event.DISCONNECT_OK: // dito (e.g. sent by TP layer). Don't send up the stack
- return;
-
-
- case Event.SET_LOCAL_ADDRESS:
- local_addr=(Address)evt.getArg();
- // GemStoneAddition - the address must hold the split-brain enabled flag
- if (local_addr instanceof IpAddress) {
- ((IpAddress)local_addr).splitBrainEnabled(this.splitBrainDetectionEnabled);
- ((IpAddress)local_addr).setMemberWeight(this.memberWeight);
- }
- notifyOfLocalAddress(local_addr);
- if(print_local_addr) {
- System.out.println("\n-------------------------------------------------------\n" +
- "GMS: address is " + local_addr +
- "\n-------------------------------------------------------");
- }
- break; // pass up
-
- case Event.SUSPECT:
- SuspectMember sm = (SuspectMember)evt.getArg(); // GemStoneAddition
- Address suspected=sm.suspectedMember;
- if (!members.containsExt(suspected)) {
- break; // not a member so don't process it
- }
- view_handler.add(new Request(Request.LEAVE, suspected, true, null, "did not respond to are-you-dead messages"));
- addSuspectToCollectors(suspected);
- break; // pass up
-
- case Event.FD_SOCK_MEMBER_LEFT_NORMALLY: // GemStoneAddition - pretty much the same as SUSPECT
- Address addr = (Address)evt.getArg();
- view_handler.add(new Request(Request.LEAVE, addr, false, null, "closed connection to FD_SOCK watcher"));
- addSuspectToCollectors(addr);
- break; // pass up
-
- case Event.UNSUSPECT:
- suspected = (Address)evt.getArg();
- impl.unsuspect(suspected);
- // GemStoneAddition - do not wait for view acknowledgement
- // from a member that has proved it is alive. This ensures
- // that we don't wait inordinately long for a member that
- // might just be extremely busy.
- addSuspectToCollectors(suspected);
- return; // discard
-
- case Event.MERGE:
- view_handler.add(new Request(Request.MERGE, null, false, (Vector)evt.getArg()));
- return; // don't pass up
-
- // GemStoneAddition - passed up from TCPGOSSIP if none of
- // the locators has a distributed system
- case Event.ENABLE_INITIAL_COORDINATOR:
- disable_initial_coord = false;
- return;
-
- // GemStoneAddiiton - network partitioning detection
- case Event.FLOATING_COORDINATOR_DISABLED:
- boolean warning = true;
- if (!floatingCoordinatorDisabled) {
- floatingCoordinatorDisabled = true;
- log.getLogWriter().info(
- ExternalStrings.GMS_LOCATOR_HAS_DISABLED_FLOATING_MEMBERSHIP_COORDINATION);
- } else {
- warning = false;
- }
- if (!stack.gfPeerFunctions.hasLocator()) {
- if (warning) {
- log.getLogWriter().fine("This member of the distributed system will only be a coordinator if there are no locators available");
- }
- ((IpAddress)local_addr).shouldntBeCoordinator(true);
- Event newaddr = new Event(Event.SET_LOCAL_ADDRESS,
- this.local_addr);
- passUp(newaddr);
- passDown(newaddr);
- }
- else if (warning && log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine(
- "This VM hosts a locator and is preferred as membership coordinator.");
- }
- return;
-
- case Event.ENABLE_NETWORK_PARTITION_DETECTION:
- this.splitBrainDetectionEnabled = true;
- this.stack.gfPeerFunctions.enableNetworkPartitionDetection();
- return;
-
- // GemStoneAddition - copied from receiveUpEvent for removal of threading model
- case Event.GET_DIGEST_OK:
- digest_promise.setResult(evt.getArg());
- return; // don't pass further up
-
- }
-
- if(impl.handleUpEvent(evt))
- passUp(evt);
- }
-
- /**
- This method is overridden to avoid hanging on getDigest(): when a JOIN is received, the coordinator needs
- to retrieve the digest from the NAKACK layer. It therefore sends down a GET_DIGEST event, to which the NAKACK layer
- responds with a GET_DIGEST_OK event.<p>
- However, the GET_DIGEST_OK event will not be processed because the thread handling the JOIN request won't process
- the GET_DIGEST_OK event until the JOIN event returns. The receiveUpEvent() method is executed by the up-handler
- thread of the lower protocol and therefore can handle the event. All we do here is unblock the mutex on which
- JOIN is waiting, allowing JOIN to return with a valid digest. The GET_DIGEST_OK event is then discarded, because
- it won't be processed twice.
- */
-// @Override // GemStoneAddition
-// public void receiveUpEvent(Event evt) {
-// switch(evt.getType()) {
-// case Event.GET_DIGEST_OK:
-// digest_promise.setResult(evt.getArg());
-// return; // don't pass further up
-// }
-// super.receiveUpEvent(evt);
-// }
-
-
- @Override // GemStoneAddition
- public void down(Event evt) {
- switch(evt.getType()) {
-
- case Event.CONNECT:
- RuntimeException joinException = null; // GemStoneAddition
- passDown(evt);
- if(local_addr == null)
- if(log.isFatalEnabled()) log.fatal("[CONNECT] local_addr is null");
- joined = false; // GemStoneAddition
- try {
- joined = impl.join(local_addr);
- }
- catch (ShunnedAddressException e) { // GemStoneAddition
- joinException = e;
- }
- catch (RuntimeException e) {
- String exClass = e.getClass().getSimpleName();
- if (exClass.equals("SystemConnectException")
- || exClass.equals("AuthenticationFailedException")
- || exClass.equals("GemFireConfigException")) { // GemStoneAddition
- joinException = e;
- } else {
- throw e;
- }
- }
- if (trace)
- log.trace("GMS join returned " + joined);
- if (joined) {
- // GemStoneAddition - return status from impl
- Event OK = new Event(Event.CONNECT_OK);
- passUp(OK);
- // GemStoneAddition - pass down as well, so lower protocols know we're connected
- passDown(OK);
- }
- else {
- if (joinException == null) { // GemStoneAddition
- joinException = stack.gfBasicFunctions.getSystemConnectException("Attempt to connect to distributed system timed out");
- }
- if (log.getLogWriter().fineEnabled()) {
- log.getLogWriter().fine("Startup is throwing " + joinException);
- }
- passUp(new Event(Event.EXIT, joinException));
- }
- if (trace)
- log.trace("GMS connect completed");
- return; // don't pass down: was already passed down
-
- case Event.DISCONNECT:
- Event disconnecting = new Event(Event.DISCONNECTING); // GemStoneAddition - starting to disconnect
- passUp(disconnecting);
- passDown(disconnecting);
- impl.leave((Address)evt.getArg());
- this.disconnected = true;
- passUp(new Event(Event.DISCONNECT_OK));
- // bug #44786 - GemFire does not issue connect() after disconnecting a stack. Nulling out the view
- // variable causes an NPE in the view casting thread if a view is being prepared during shutdown and
- // can cause NPEs in other places that do not have null checks for this variable
- //initState(); // in case connect() is called again
- break; // pass down
- }
-
-// boolean handled = impl.handleDownEvent(evt); GemStoneAddition - this does nothing but slow things down a bit
-// if(handled)
- passDown(evt);
- }
-
-
- /** Setup the Protocol instance according to the configuration string */
- @Override // GemStoneAddition
- public boolean setProperties(Properties props) {
- String str;
-
- super.setProperties(props);
- str=props.getProperty("shun");
- if(str != null) {
- shun=Boolean.valueOf(str).booleanValue();
- props.remove("shun");
- }
-
- str=props.getProperty("merge_leader");
- if(str != null) {
- merge_leader=Boolean.valueOf(str).booleanValue();
- props.remove("merge_leader");
- }
-
- str=props.getProperty("print_local_addr");
- if(str != null) {
- print_local_addr=Boolean.valueOf(str).booleanValue();
- props.remove("print_local_addr");
- }
-
- str=props.getProperty("join_timeout"); // time to wait for JOIN
- if(str != null) {
- join_timeout=Long.parseLong(str);
- props.remove("join_timeout");
- }
-
- str=props.getProperty("join_retry_timeout"); // time to wait between JOINs
- if(str != null) {
- join_retry_timeout=Long.parseLong(str);
- props.remove("join_retry_timeout");
- }
-
- str=props.getProperty("leave_timeout"); // time to wait until coord responds to LEAVE req.
- if(str != null) {
- leave_timeout=Long.parseLong(str);
- props.remove("leave_timeout");
- }
-
- str=props.getProperty("merge_timeout"); // time to wait for MERGE_RSPS from subgroup coordinators
- if(str != null) {
- merge_timeout=Long.parseLong(str);
- props.remove("merge_timeout");
- }
-
- str=props.getProperty("digest_timeout"); // time to wait for GET_DIGEST_OK from PBCAST
- if(str != null) {
- digest_timeout=Long.parseLong(str);
- props.remove("digest_timeout");
- }
-
- str=props.getProperty("view_ack_collection_timeout");
- if(str != null) {
- view_ack_collection_timeout=Long.parseLong(str);
- props.remove("view_ack_collection_timeout");
- }
-
- str=props.getProperty("resume_task_timeout");
- if(str != null) {
- resume_task_timeout=Long.parseLong(str);
- props.remove("resume_task_timeout");
- }
-
- str=props.getProperty("disable_initial_coord");
- if(str != null) {
- disable_initial_coord=Boolean.valueOf(str).booleanValue();
- props.remove("disable_initial_coord");
- }
- // GemStoneAddition - InternalLocator needs this
- if (Boolean.getBoolean("p2p.enableInitialCoordinator")) {
- disable_initial_coord = false;
- }
-
- //GemStoneAddition - split-brain detection support
- str=props.getProperty("split-brain-detection");
- if (str != null) {
- splitBrainDetectionEnabled = Boolean.valueOf(str).booleanValue();
- props.remove("split-brain-detection");
- }
- str=props.getProperty("
<TRUNCATED>