You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by kl...@apache.org on 2016/02/23 21:23:33 UTC

[37/94] [abbrv] [partial] incubator-geode git commit: Merge remote-tracking branch 'origin/develop' into feature/GEODE-917

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/5beaaedc/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
----------------------------------------------------------------------
diff --cc geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
index d75b28d,0000000..edfee10
mode 100755,000000..100755
--- a/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/mgr/GMSMembershipManager.java
@@@ -1,2652 -1,0 +1,2655 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package com.gemstone.gemfire.distributed.internal.membership.gms.mgr;
 +
 +import java.io.IOException;
 +import java.io.NotSerializableException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Iterator;
 +import java.util.LinkedHashMap;
 +import java.util.LinkedList;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.concurrent.ConcurrentHashMap;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +import java.util.concurrent.locks.ReadWriteLock;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
 +
 +import org.apache.logging.log4j.Logger;
 +
 +import com.gemstone.gemfire.CancelException;
 +import com.gemstone.gemfire.ForcedDisconnectException;
 +import com.gemstone.gemfire.GemFireConfigException;
 +import com.gemstone.gemfire.InternalGemFireError;
 +import com.gemstone.gemfire.SystemConnectException;
 +import com.gemstone.gemfire.SystemFailure;
 +import com.gemstone.gemfire.ToDataException;
 +import com.gemstone.gemfire.cache.Cache;
 +import com.gemstone.gemfire.cache.server.CacheServer;
 +import com.gemstone.gemfire.distributed.DistributedMember;
 +import com.gemstone.gemfire.distributed.DistributedSystem;
 +import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException;
 +import com.gemstone.gemfire.distributed.Locator;
 +import com.gemstone.gemfire.distributed.internal.AdminMessageType;
 +import com.gemstone.gemfire.distributed.internal.DMStats;
 +import com.gemstone.gemfire.distributed.internal.DSClock;
 +import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 +import com.gemstone.gemfire.distributed.internal.DistributionException;
 +import com.gemstone.gemfire.distributed.internal.DistributionManager;
 +import com.gemstone.gemfire.distributed.internal.DistributionMessage;
 +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
 +import com.gemstone.gemfire.distributed.internal.InternalLocator;
 +import com.gemstone.gemfire.distributed.internal.OverflowQueueWithDMStats;
 +import com.gemstone.gemfire.distributed.internal.SizeableRunnable;
 +import com.gemstone.gemfire.distributed.internal.StartupMessage;
 +import com.gemstone.gemfire.distributed.internal.direct.DirectChannel;
 +import com.gemstone.gemfire.distributed.internal.direct.DirectChannelListener;
 +import com.gemstone.gemfire.distributed.internal.direct.ShunnedMemberException;
 +import com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener;
 +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
 +import com.gemstone.gemfire.distributed.internal.membership.MembershipManager;
 +import com.gemstone.gemfire.distributed.internal.membership.MembershipTestHook;
 +import com.gemstone.gemfire.distributed.internal.membership.NetView;
 +import com.gemstone.gemfire.distributed.internal.membership.QuorumChecker;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.SuspectMember;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.fd.GMSHealthMonitor;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.Manager;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.membership.GMSJoinLeave;
 +import com.gemstone.gemfire.distributed.internal.membership.gms.messenger.GMSQuorumChecker;
 +import com.gemstone.gemfire.internal.Assert;
 +import com.gemstone.gemfire.internal.SystemTimer;
 +import com.gemstone.gemfire.internal.Version;
 +import com.gemstone.gemfire.internal.admin.remote.RemoteTransportConfig;
++import com.gemstone.gemfire.internal.cache.CacheServerImpl;
 +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
 +import com.gemstone.gemfire.internal.cache.xmlcache.CacheServerCreation;
 +import com.gemstone.gemfire.internal.cache.xmlcache.CacheXmlGenerator;
 +import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
 +import com.gemstone.gemfire.internal.logging.log4j.AlertAppender;
 +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
 +import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
 +import com.gemstone.gemfire.internal.shared.StringPrintWriter;
 +import com.gemstone.gemfire.internal.tcp.ConnectExceptions;
 +import com.gemstone.gemfire.internal.tcp.MemberShunnedException;
 +import com.gemstone.gemfire.internal.util.Breadcrumbs;
 +
 +public class GMSMembershipManager implements MembershipManager, Manager
 +{
 +  private static final Logger logger = Services.getLogger();
 +  
 +  /** product version to use for multicast serialization */
 +  volatile boolean disableMulticastForRollingUpgrade;
 +  
 +  /**
 +   * set to true if the distributed system that created this manager was
 +   * auto-reconnecting when it was created.
 +   */
 +  boolean wasReconnectingSystem;
 +  
 +  /**
 +   * A quorum checker is created during reconnect and is held
 +   * here so it is available to the UDP protocol for passing off
 +   * the ping-pong responses used in the quorum-checking algorithm. 
 +   */
 +  private volatile QuorumChecker quorumChecker;
 +  
 +  /**
 +   * thread-local used to force use of Messenger for communications, usually to
 +   * avoid deadlock when conserve-sockets=true.  Use of this should be removed
 +   * when connection pools are implemented in the direct-channel 
 +   */
 +  private ThreadLocal<Boolean> forceUseUDPMessaging = new ThreadLocal<Boolean>();
 +  
 +  /**
 +   * Trick class to make the startup synch more
 +   * visible in stack traces
 +   * 
 +   * @see GMSMembershipManager#startupLock
 +   */
 +  static class EventProcessingLock  {
 +    public EventProcessingLock() {
 +    }
 +  }
 +  
 +  static class StartupEvent  {
 +    static final int SURPRISE_CONNECT = 1;
 +    static final int VIEW = 2;
 +    static final int MESSAGE = 3;
 +    
 +    /**
 +     * indicates whether the event is a departure, a surprise connect
 +     * (i.e., before the view message arrived), a view, or a regular
 +     * message
 +     * 
 +     * @see #SURPRISE_CONNECT
 +     * @see #VIEW
 +     * @see #MESSAGE
 +     */
 +    private int kind;
 +    
 +    // Miscellaneous state depending on the kind of event
 +    InternalDistributedMember member;
 +    boolean crashed;
 +    String reason;
 +    DistributionMessage dmsg;
 +    NetView gmsView;
 +    
 +    @Override
 +    public String toString() {
 +      StringBuffer sb = new StringBuffer();
 +      sb.append("kind=");
 +      switch (kind) {
 +      case SURPRISE_CONNECT:
 +        sb.append("connect; member = <" + member + ">");
 +        break;
 +      case VIEW:
 +        String text = gmsView.toString();
 +        sb.append("view <" + text + ">");
 +        break;
 +      case MESSAGE:
 +        sb.append("message <" + dmsg + ">");
 +        break;
 +      default:
 +        sb.append("unknown=<" + kind + ">");
 +        break;
 +      }
 +      return sb.toString();
 +    }
 +
 +    /**
 +     * Create a surprise connect event
 +     * @param member the member connecting
 +     */
 +    StartupEvent(final InternalDistributedMember member) {
 +      this.kind = SURPRISE_CONNECT;
 +      this.member = member;
 +    }
 +    /**
 +     * Indicate if this is a surprise connect event
 +     * @return true if this is a connect event
 +     */
 +    boolean isSurpriseConnect() {
 +      return this.kind == SURPRISE_CONNECT;
 +    }
 +
 +    /**
 +     * Create a view event
 +     * @param v the new view
 +     */
 +    StartupEvent(NetView v) {
 +      this.kind = VIEW;
 +      this.gmsView = v;
 +    }
 +    
 +    /**
 +     * Indicate if this is a view event
 +     * @return true if this is a view event
 +     */
 +    boolean isGmsView() {
 +      return this.kind == VIEW;
 +    }
 +
 +    /**
 +     * Create a message event
 +     * @param d the message
 +     */
 +    StartupEvent(DistributionMessage d) {
 +      this.kind = MESSAGE;
 +      this.dmsg = d;
 +    }
 +    /**
 +     * Indicate if this is a message event
 +     * @return true if this is a message event
 +     */
 +    boolean isDistributionMessage() {
 +      return this.kind == MESSAGE;
 +    }
 +  }
 +  
 +  private int membershipCheckTimeout = DistributionConfig.DEFAULT_SECURITY_PEER_VERIFYMEMBER_TIMEOUT;
 +
 +  /**
 +   * This object synchronizes threads waiting for
 +   * startup to finish.  Updates to {@link #startupMessages}
 +   * are synchronized through this object.
 +   */
 +  protected final EventProcessingLock startupLock = new EventProcessingLock();
 +  
 +  /**
 +   * This is the latest view (ordered list of DistributedMembers) 
 +   * that has been installed
 +   * 
 +   * All accesses to this object are protected via {@link #latestViewLock}
 +   */
 +  protected NetView latestView = new NetView();
 +
 +  /**
 +   * This is the lock for protecting access to latestView
 +   * 
 +   * @see #latestView
 +   */
 +  protected ReadWriteLock latestViewLock = new ReentrantReadWriteLock();
 +  
 +  /**
 +   * This is the listener that accepts our membership events
 +   */
 +  protected com.gemstone.gemfire.distributed.internal.membership.DistributedMembershipListener listener;
 +
 +  /**
 +   * Membership failure listeners - for testing
 +   */
 +  List membershipTestHooks;
 +  
 +  /**
 +   * This is a representation of the local member (ourself)
 +   */
 +  protected InternalDistributedMember address = null; // new DistributedMember(-1);
 +
 +  protected DirectChannel directChannel;
 +
 +  protected MyDCReceiver dcReceiver;
 +  
 +  volatile boolean isJoining;
 +  
 +  /** have we joined successfully? */
 +  volatile boolean hasJoined;
 +  
 +  /**
 +   * Members of the distributed system that we believe have shut down.
 +   * Keys are instances of {@link InternalDistributedMember}, values are 
 +   * Longs indicating the time this member was shunned.
 +   * 
 +   * Members are removed after {@link #SHUNNED_SUNSET} seconds have
 +   * passed.
 +   * 
 +   * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock}
 +   * 
 +   * @see System#currentTimeMillis()
 +   */
 +//  protected final Set shunnedMembers = Collections.synchronizedSet(new HashSet());
 +  protected final Map shunnedMembers = new ConcurrentHashMap();
 +
 +  /**
 +   * Members that have sent a shutdown message.  This is used to suppress
 +   * suspect processing that otherwise becomes pretty aggressive 
 +   * when a member is shutting down.
 +   */
 +  private final Map shutdownMembers = new BoundedLinkedHashMap(1000);
 +  
 +  /**
 +   * per bug 39552, keep a list of members that have been shunned and for
 +   * which a message is printed.  Contents of this list are cleared at the
 +   * same time they are removed from {@link #shunnedMembers}.
 +   * 
 +   * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock}
 +   */
 +  protected final HashSet shunnedAndWarnedMembers = new HashSet();
 +  /**
 +   * The identities and birth-times of others that we have allowed into
 +   * membership at the distributed system level, but have not yet appeared
 +   * in a view.
 +   * <p>
 +   * Keys are instances of {@link InternalDistributedMember}, values are 
 +   * Longs indicating the time this member was shunned.
 +   * <p>
 +   * Members are removed when a view containing them is processed.  If,
 +   * after {@link #surpriseMemberTimeout} milliseconds have passed, a view
 +   * containing the member has not arrived, the member is removed from
 +   * membership and member-left notification is performed. 
 +   * <p>>
 +   * Accesses to this list needs to be under the read or write lock of {@link #latestViewLock}
 +   * 
 +   * @see System#currentTimeMillis()
 +   */
 +  protected final Map<InternalDistributedMember, Long> surpriseMembers = new ConcurrentHashMap();
 +
 +  /**
 +   * the timeout interval for surprise members.  This is calculated from 
 +   * the member-timeout setting
 +   */
 +  protected int surpriseMemberTimeout;
 +  
 +  /**
 +   * javagroups can skip views and omit telling us about a crashed member.
 +   * This map holds a history of suspected members that we use to detect
 +   * crashes.
 +   */
 +  private final Map<InternalDistributedMember, Long> suspectedMembers = new ConcurrentHashMap();
 +  
 +  /**
 +   * the timeout interval for suspected members
 +   */
 +  private final long suspectMemberTimeout = 180000;
 +  
 +  /**
 +   * Length of time, in seconds, that a member is retained in the zombie set
 +   * 
 +   * @see #shunnedMembers
 +   */
 +  static private final int SHUNNED_SUNSET = Integer.getInteger(
 +      "gemfire.shunned-member-timeout", 300).intValue();
 +  
 +  /**
 +   * Set to true when the service should stop.
 +   */
 +  protected volatile boolean shutdownInProgress = false;
 +  
 +  /**
 +   * Set to true when upcalls should be generated for
 +   * events.
 +   */
 +  protected volatile boolean processingEvents = false;
 +  
 +  /**
 +   * This is the latest viewId installed
 +   */
 +  long latestViewId = -1;
 +  
 +  /** distribution manager statistics */
 +  DMStats stats;
 +
 +  /**
 +   A list of messages received during channel startup that couldn't be processed yet.
 +   Additions or removals of this list must be synchronized
 +   via {@link #startupLock}.
 +   @since 5.0
 +   */
 +  protected LinkedList<StartupEvent> startupMessages = new LinkedList<StartupEvent>();
 +  
 +  /**
 +   * ARB: the map of latches is used to block peer handshakes till
 +   * authentication is confirmed.
 +   */
 +  final private HashMap memberLatch = new HashMap();
 +  
 +  /**
 +   * Insert our own MessageReceiver between us and the direct channel, in order
 +   * to correctly filter membership events.
 +   * 
 +   * @author jpenney
 +   * 
 +   */
 +  class MyDCReceiver implements DirectChannelListener
 +  {
 +
 +    DirectChannelListener upCall;
 +    
 +    /**
 +     * Don't provide events until the caller has told us we are ready.
 +     * 
 +     * Synchronization provided via GroupMembershipService.class.
 +     * 
 +     * Note that in practice we only need to delay accepting the first
 +     * client; we don't need to put this check before every call...
 +     *
 +     */
 +   MyDCReceiver(DirectChannelListener up) {
 +      upCall = up;
 +    }
 +
 +    public void messageReceived(DistributionMessage msg) {
 +      // bug 36851 - notify failure detection that we've had contact from a member
 +      services.getHealthMonitor().contactedBy(msg.getSender());
 +      handleOrDeferMessage(msg);
 +    }
 +
 +    public DistributionManager getDM() {
 +     return upCall.getDM();
 +    }
 +
 +  }
 +
 +
 +  /** if we connect to a locator that has NPD enabled then we enable it in this VM */
 +  public void enableNetworkPartitionDetection() {
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("Network partition detection is being enabled");
 +    }
 +    this.services.getConfig().getDistributionConfig().setEnableNetworkPartitionDetection(true);
 +    this.services.getConfig().setNetworkPartitionDetectionEnabled(true);
 +  }
 +  
 +  /**
 +   * Analyze a given view object, generate events as appropriate
 +   * 
 +   * @param newView
 +   */
 +  protected void processView(long newViewId, NetView newView)
 +  {
 +    // Sanity check...
 +     if (logger.isDebugEnabled()) {
 +      StringBuffer msg = new StringBuffer(200);
 +      msg.append("Membership: Processing view ");
 +      msg.append(newView);
 +      msg.append("} on " + address.toString());
 +      if (!newView.contains(address)) {
 +        logger.info(LocalizedMessage.create(
 +            LocalizedStrings.GroupMembershipService_THE_MEMBER_WITH_ID_0_IS_NO_LONGER_IN_MY_OWN_VIEW_1,
 +            new Object[] {address, newView}));
 +      }
 +    }
 +     
 +//     if (newView.getCrashedMembers().size() > 0) {
 +//       // dump stack for debugging #39827
 +//       OSProcess.printStacks(0);
 +//     }
 +    // We perform the update under a global lock so that other
 +    // incoming events will not be lost in terms of our global view.
 +    latestViewLock.writeLock().lock();
 +    try {
 +      // first determine the version for multicast message serialization
 +      Version version = Version.CURRENT;
 +      for (Iterator<Map.Entry<InternalDistributedMember, Long>> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
 +        InternalDistributedMember mbr = it.next().getKey();
 +        Version itsVersion = mbr.getVersionObject();
 +        if (itsVersion != null && version.compareTo(itsVersion) < 0) {
 +          version = itsVersion;
 +        }
 +      }
 +      for (InternalDistributedMember mbr: newView.getMembers()) {
 +        Version itsVersion = mbr.getVersionObject();
 +        if (itsVersion != null && itsVersion.compareTo(version) < 0) {
 +          version = mbr.getVersionObject();
 +        }
 +      }
 +      disableMulticastForRollingUpgrade = !version.equals(Version.CURRENT);
 +      
 +      if (newViewId < latestViewId) {
 +        // ignore this view since it is old news
 +        return;
 +      }
 +
 +      // Save previous view, for delta analysis
 +      NetView priorView = latestView;
 +      
 +      // update the view to reflect our changes, so that
 +      // callbacks will see the new (updated) view.
 +      latestViewId = newViewId;
 +      latestView = new NetView(newView, newView.getViewId());
 +      
 +      // look for additions
 +      for (int i = 0; i < newView.getMembers().size(); i++) { // additions
 +        InternalDistributedMember m = (InternalDistributedMember)newView.getMembers().get(i);
 +        
 +        // Once a member has been seen via a view, remove them from the
 +        // newborn set
 +        boolean wasSurprise = surpriseMembers.remove(m) != null;
 +        
 +        // bug #45155 - membership view processing was slow, causing a member to connect as "surprise"
 +        // and the surprise timeout removed the member and shunned it, keeping it from being
 +        // recognized as a valid member when it was finally seen in a view
 +//        if (isShunned(m)) {
 +//          warnShuns.add(m);
 +//          continue;
 +//        }
 +
 +        // if it's in a view, it's no longer suspect
 +        suspectedMembers.remove(m);
 +
 +        if (priorView.contains(m) || wasSurprise) {
 +          continue; // already seen
 +        }
 +      
 +        // ARB: unblock any waiters for this particular member.
 +        // i.e. signal any waiting threads in tcpconduit.
 +        String authInit = this.services.getConfig().getDistributionConfig().getSecurityPeerAuthInit();
 +        boolean isSecure = authInit != null && authInit.length() != 0;
 +
 +        if (isSecure) {
 +          CountDownLatch currentLatch;
 +          if ((currentLatch = (CountDownLatch)memberLatch.get(m)) != null) {
 +            currentLatch.countDown();
 +          }
 +        }
 +
 +        if (shutdownInProgress()) {
 +          addShunnedMember(m);
 +          continue; // no additions processed after shutdown begins
 +        } else {
 +          boolean wasShunned = endShun(m); // bug #45158 - no longer shun a process that is now in view
 +          if (wasShunned && logger.isDebugEnabled()) {
 +            logger.debug("No longer shunning {} as it is in the current membership view", m);
 +          }
 +        }
 +        
 +        logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_PROCESSING_ADDITION__0_, m));
 +
 +        try {
 +          listener.newMemberConnected(m);
 +        }
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (DistributedSystemDisconnectedException e) {
 +          // don't log shutdown exceptions
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          logger.info(LocalizedMessage.create(
 +              LocalizedStrings.GroupMembershipService_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_ADDITION_OF__0, m), t);
 +        }
 +      } // additions
 +      
 +      // look for departures
 +      for (int i = 0; i < priorView.getMembers().size(); i++) { // departures
 +        InternalDistributedMember m = (InternalDistributedMember)priorView.getMembers().get(i);
 +        if (newView.contains(m)) {
 +          continue; // still alive
 +        }
 +        
 +        if (surpriseMembers.containsKey(m)) {
 +          continue; // member has not yet appeared in a view
 +        }
 +
 +        try {
 +          removeWithViewLock(m,
 +              newView.getCrashedMembers().contains(m) || suspectedMembers.containsKey(m)
 +              , "departed membership view");
 +        }
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          logger.info(LocalizedMessage.create(
 +              LocalizedStrings.GroupMembershipService_MEMBERSHIP_FAULT_WHILE_PROCESSING_VIEW_REMOVAL_OF__0, m), t);
 +        }
 +      } // departures
 +      
 +      // expire surprise members, add others to view
 +      long oldestAllowed = System.currentTimeMillis() - this.surpriseMemberTimeout;
 +      for (Iterator<Map.Entry<InternalDistributedMember, Long>> it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
 +        Map.Entry<InternalDistributedMember, Long> entry = it.next();
 +        Long birthtime = (Long)entry.getValue();
 +        if (birthtime.longValue() < oldestAllowed) {
 +          it.remove();
 +          InternalDistributedMember m = entry.getKey();
 +          logger.info(LocalizedMessage.create(
 +            LocalizedStrings.GroupMembershipService_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m));
 +          removeWithViewLock(m, true, "not seen in membership view in "
 +              + this.surpriseMemberTimeout + "ms");
 +        }
 +        else {
 +          if (!latestView.contains(entry.getKey())) {
 +            latestView.add(entry.getKey());
 +          }
 +        }
 +      }
 +      // expire suspected members
 +      oldestAllowed = System.currentTimeMillis() - this.suspectMemberTimeout;
 +      for (Iterator it=suspectedMembers.entrySet().iterator(); it.hasNext(); ) {
 +        Map.Entry entry = (Map.Entry)it.next();
 +        Long birthtime = (Long)entry.getValue();
 +        if (birthtime.longValue() < oldestAllowed) {
 +          InternalDistributedMember m = (InternalDistributedMember)entry.getKey();
 +          it.remove();
 +        }
 +      }
 +      try {
 +        listener.viewInstalled(latestView);
 +        startCleanupTimer();
 +      }
 +      catch (DistributedSystemDisconnectedException se) {
 +      }
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * the timer used to perform periodic tasks
 +   * 
 +   * Concurrency: protected by {@link #latestViewLock} ReentrantReadWriteLock
 +   */
 +  private SystemTimer cleanupTimer;
 +
 +  private Services services;
 +
 +  private boolean mcastEnabled;
 +
 +  private boolean tcpDisabled;
 +
 +
 +  @Override
 +  public boolean isMulticastAllowed() {
 +    return !disableMulticastForRollingUpgrade;
 +  }
 +
 +  /**
 +   * Joins the distributed system
 +   * 
 +   * @throws GemFireConfigException - configuration error
 +   * @throws SystemConnectException - problem joining
 +   */
 +  private void join() {
 +    services.setShutdownCause(null);
 +    services.getCancelCriterion().cancel(null);
 +    
 +    latestViewLock.writeLock().lock();
 +    try {
 +      try {
 +        this.isJoining = true; // added for bug #44373
 +
 +        // connect
 +        long start = System.currentTimeMillis();
 +
 +        boolean ok = services.getJoinLeave().join();
 +        
 +        if (!ok) {
 +          throw new GemFireConfigException("Unable to join the distributed system.  " +
 +              "Operation either timed out, was stopped or Locator does not exist.");
 +        }
 +
 +        long delta = System.currentTimeMillis() - start;
 +
 +        logger.info(LogMarker.DISTRIBUTION, LocalizedMessage.create(
 +            LocalizedStrings.GroupMembershipService_JOINED_TOOK__0__MS, delta));
 +
 +        NetView initialView = services.getJoinLeave().getView();
 +        latestView = new NetView(initialView, initialView.getViewId());
 +        listener.viewInstalled(latestView);
 +        
 +      } catch (RuntimeException ex) {
 +        throw ex;
 +      }
 +      catch (Exception ex) {
 +        if (ex.getCause() != null && ex.getCause().getCause() instanceof SystemConnectException) {
 +          throw (SystemConnectException)(ex.getCause().getCause());
 +        }
 +        throw new DistributionException(LocalizedStrings.GroupMembershipService_AN_EXCEPTION_WAS_THROWN_WHILE_JOINING.toLocalizedString(), ex);
 +      }
 +      finally {
 +        this.isJoining = false;
 +      }
 +    }
 +    finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +  }
 +
 +
 +  public GMSMembershipManager(DistributedMembershipListener listener) {
 +    Assert.assertTrue(listener != null);
 +    this.listener = listener;
 +  }
 +  
 +  @Override
 +  public void init(Services services) {
 +    this.services = services;
 +
 +    Assert.assertTrue(services != null);
 +    
 +    this.stats = services.getStatistics();
 +    DistributionConfig config = services.getConfig().getDistributionConfig();
 +    RemoteTransportConfig transport = services.getConfig().getTransport();
 +
 +    this.membershipCheckTimeout = config.getSecurityPeerMembershipTimeout();
 +    this.wasReconnectingSystem = transport.getIsReconnectingDS();
 +    
 +    // cache these settings for use in send()
 +    this.mcastEnabled = transport.isMcastEnabled();
 +    this.tcpDisabled  = transport.isTcpDisabled();
 +
 +    if (!this.tcpDisabled) {
 +      dcReceiver = new MyDCReceiver(listener);
 +    }
 +    
 +    surpriseMemberTimeout = Math.max(20 * DistributionConfig.DEFAULT_MEMBER_TIMEOUT,
 +        20 * config.getMemberTimeout());
 +    surpriseMemberTimeout = Integer.getInteger("gemfire.surprise-member-timeout", surpriseMemberTimeout).intValue();
 +    
 +  }
 +  
 +  @Override
 +  public void start() {
 +    DistributionConfig config = services.getConfig().getDistributionConfig();
 +    RemoteTransportConfig transport = services.getConfig().getTransport();
 +
 +    int dcPort = 0;
 +    if (!tcpDisabled) {
 +      directChannel = new DirectChannel(this, dcReceiver, config);
 +      dcPort = directChannel.getPort();
 +    }
 +
 +    
 +    services.getMessenger().getMemberID().setDirectChannelPort(dcPort);
 +
 +  }
 +  
 +  
 +  @Override
 +  public void joinDistributedSystem() {
 +    long startTime = System.currentTimeMillis();
 +    
 +    try {
 +      join();
 +    }
 +    catch (RuntimeException e) {
 +      if (directChannel != null) {
 +        directChannel.disconnect(e);
 +      }
 +      throw e;
 +    }
 +    
 +    this.address = services.getMessenger().getMemberID();
 +
 +    int dcPort = 0;
 +    if (directChannel != null) {
 +      dcPort = directChannel.getPort();
 +    }
 +    
 +    if (directChannel != null) {
 +      directChannel.setLocalAddr(address);
 +    }
 +
 +    this.hasJoined = true;
 +
 +    // in order to debug startup issues we need to announce the membership
 +    // ID as soon as we know it
 +    logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_entered_into_membership_in_group_0_with_id_1,
 +        new Object[]{""+(System.currentTimeMillis()-startTime)}));
 +
 +  }
 +  
 +  @Override
 +  public void started() {
 +  }
 +  
 +
 +  /** this is invoked by JoinLeave when there is a loss of quorum in the membership system */
 +  public void quorumLost(Collection<InternalDistributedMember> failures, NetView view) {
 +    // notify of quorum loss if split-brain detection is enabled (meaning we'll shut down) or
 +    // if the loss is more than one member
 +    
 +    boolean notify = failures.size() > 1;
 +    if (!notify) {
 +      notify = services.getConfig().isNetworkPartitionDetectionEnabled();
 +    }
 +    
 +    if (notify) {
 +      List<InternalDistributedMember> remaining = new ArrayList<InternalDistributedMember>(view.getMembers());
 +      remaining.removeAll(failures);
 +      
 +      if (inhibitForceDisconnectLogging) {
 +        if (logger.isDebugEnabled()) {
 +          logger.debug("<ExpectedException action=add>Possible loss of quorum</ExpectedException>");
 +        }
 +      }
 +      logger.fatal(LocalizedMessage.create(
 +          LocalizedStrings.GroupMembershipService_POSSIBLE_LOSS_OF_QUORUM_DETECTED, new Object[] {failures.size(), failures}));
 +      if (inhibitForceDisconnectLogging) {
 +        if (logger.isDebugEnabled()) {
 +          logger.debug("<ExpectedException action=remove>Possible loss of quorum</ExpectedException>");
 +        }
 +      }
 +      
 +  
 +      try {
 +        this.listener.quorumLost(new HashSet<InternalDistributedMember>(failures),
 +            remaining);
 +      } catch (CancelException e) {
 +        // safe to ignore - a forced disconnect probably occurred
 +      }
 +    }
 +  }
 +  
 +
 +  @Override
 +  public boolean testMulticast() {
 +    try {
 +      return services.getMessenger().testMulticast(services.getConfig().getMemberTimeout());
 +    } catch (InterruptedException e) {
 +      services.getCancelCriterion().checkCancelInProgress(e);
 +      Thread.currentThread().interrupt();
 +      return false;
 +    }
 +  }
 +  
 +  /**
 +   * Remove a member.  {@link #latestViewLock} must be held
 +   * before this method is called.  If member is not already shunned,
 +   * the uplevel event handler is invoked.
 +   * 
 +   * @param dm
 +   * @param crashed
 +   * @param reason
 +   */
 +  protected void removeWithViewLock(InternalDistributedMember dm,
 +      boolean crashed, String reason) {
 +    boolean wasShunned = isShunned(dm);
 +
 +    // Delete resources
 +    destroyMember(dm, crashed, reason);
 +
 +    if (wasShunned) {
 +      return; // Explicit deletion, no upcall.
 +    }
 +    
 +    try {
 +      listener.memberDeparted(dm, crashed, reason);
 +    }
 +    catch (DistributedSystemDisconnectedException se) {
 +      // let's not get huffy about it
 +    }
 +  }
 +  
 +  /**
 +   * Process a surprise connect event, or place it on the startup queue.
 +   * @param member the member
 +   */
 +  protected void handleOrDeferSurpriseConnect(InternalDistributedMember member) {
 +    synchronized (startupLock) {
 +      if (!processingEvents) {
 +        startupMessages.add(new StartupEvent(member));
 +        return;
 +      }
 +    }
 +    processSurpriseConnect(member);
 +  }
 +  
 +  public void startupMessageFailed(DistributedMember mbr, String failureMessage) {
 +    // fix for bug #40666
 +    addShunnedMember((InternalDistributedMember)mbr);
 +    // fix for bug #41329, hang waiting for replies
 +    try {
 +      listener.memberDeparted((InternalDistributedMember)mbr, true, "failed to pass startup checks");
 +    }
 +    catch (DistributedSystemDisconnectedException se) {
 +      // let's not get huffy about it
 +    }
 +  }
 +
 +  
 +  /**
 +   * Logic for handling a direct connection event (message received
 +   * from a member not in the view).  Does not employ the
 +   * startup queue.
 +   * <p>
 +   * Must be called with {@link #latestViewLock} held.  Waits
 +   * until there is a stable view.  If the member has already
 +   * been added, simply returns; else adds the member.
 +   * 
 +   * @param dm the member joining
 +   */
 +  public boolean addSurpriseMember(DistributedMember dm) {
 +    final InternalDistributedMember member = (InternalDistributedMember)dm;
 +    boolean warn = false;
 +    
 +    latestViewLock.writeLock().lock();
 +    try {
 +      // At this point, the join may have been discovered by
 +      // other means.
 +      if (latestView.contains(member)) {
 +        return true;
 +      }
 +      if (surpriseMembers.containsKey(member)) {
 +        return true;
 +      }
 +      if (member.getVmViewId() < 0) {
 +        logger.warn("adding a surprise member that has not yet joined the distributed system: " + member, new Exception("stack trace"));
 +      }
 +      if (latestView.getViewId() > member.getVmViewId()) {
 +        // tell the process that it should shut down distribution.
 +        // Run in a separate thread so we don't hold the view lock during the request.  Bug #44995
 +        new Thread(Thread.currentThread().getThreadGroup(),
 +            "Removing shunned GemFire node " + member) {
 +          @Override
 +          public void run() {
 +            // fix for bug #42548
 +            // this is an old member that shouldn't be added
 +            logger.warn(LocalizedMessage.create(
 +                LocalizedStrings.GroupMembershipService_Invalid_Surprise_Member, new Object[]{member, latestView}));
 +            requestMemberRemoval(member, "this member is no longer in the view but is initiating connections");
 +          }
 +        }.start();
 +        addShunnedMember(member);
 +        return false;
 +      }
 +
 +      // Adding him to this set ensures we won't remove him if a new
 +      // view comes in and he's still not visible.
 +      surpriseMembers.put(member, Long.valueOf(System.currentTimeMillis()));
 +
 +      if (shutdownInProgress()) {
 +        // Force disconnect, esp. the TCPConduit
 +        String msg = LocalizedStrings.GroupMembershipService_THIS_DISTRIBUTED_SYSTEM_IS_SHUTTING_DOWN.toLocalizedString();
 +        if (directChannel != null) {
 +          try {
 +            directChannel.closeEndpoint(member, msg);
 +          } catch (DistributedSystemDisconnectedException e) {
 +            // ignore - happens during shutdown
 +          }
 +        }
 +        destroyMember(member, false, msg); // for good luck
 +        return true; // allow during shutdown
 +      }
 +
 +      if (isShunned(member)) {
 +        warn = true;
 +        surpriseMembers.remove(member);
 +      } else {
 +
 +        // Now that we're sure the member is new, add them.
 +        // make sure the surprise-member cleanup task is running
 +        if (this.cleanupTimer == null) {
 +          startCleanupTimer();
 +        } // cleanupTimer == null
 +
 +        // Ensure that the member is accounted for in the view
 +        // Conjure up a new view including the new member. This is necessary
 +        // because we are about to tell the listener about a new member, so
 +        // the listener should rightfully expect that the member is in our
 +        // membership view.
 +
 +        // However, we put the new member at the end of the list.  This
 +        // should ensure he's not chosen as an elder.
 +        // This will get corrected when he finally shows up in the
 +        // view.
 +        NetView newMembers = new NetView(latestView, latestView.getViewId());
 +        newMembers.add(member);
 +        latestView = newMembers;
 +      }
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +    if (warn) { // fix for bug #41538 - deadlock while alerting
 +      logger.warn(LocalizedMessage.create(
 +          LocalizedStrings.GroupMembershipService_MEMBERSHIP_IGNORING_SURPRISE_CONNECT_FROM_SHUNNED_MEMBER_0, member));
 +    } else {
 +      listener.newMemberConnected(member);
 +    }
 +    return !warn;
 +  }
 +  
 +
 +  /** starts periodic task to perform cleanup chores such as expire surprise members */
 +  private void startCleanupTimer() {
 +    latestViewLock.writeLock().lock();
 +    try {
 +      if (this.cleanupTimer != null) {
 +        return;
 +      }
 +      DistributedSystem ds = InternalDistributedSystem.getAnyInstance();
 +      if (ds != null && ds.isConnected()) {
 +        this.cleanupTimer = new SystemTimer(ds, true);
 +        SystemTimer.SystemTimerTask st = new SystemTimer.SystemTimerTask() {
 +          @Override
 +          public void run2() {
 +            latestViewLock.writeLock().lock();
 +            try {
 +              long oldestAllowed = System.currentTimeMillis() - surpriseMemberTimeout;
 +              for (Iterator it=surpriseMembers.entrySet().iterator(); it.hasNext(); ) {
 +                Map.Entry entry = (Map.Entry)it.next();
 +                Long birthtime = (Long)entry.getValue();
 +                if (birthtime.longValue() < oldestAllowed) {
 +                  it.remove();
 +                  InternalDistributedMember m = (InternalDistributedMember)entry.getKey();
 +                  logger.info(LocalizedMessage.create(
 +                      LocalizedStrings.GroupMembershipService_MEMBERSHIP_EXPIRING_MEMBERSHIP_OF_SURPRISE_MEMBER_0, m));
 +                  removeWithViewLock(m, true, "not seen in membership view in "
 +                      + surpriseMemberTimeout + "ms");
 +                }
 +              }
 +            } finally {
 +              latestViewLock.writeLock().unlock();
 +            }
 +          }
 +        };
 +        this.cleanupTimer.scheduleAtFixedRate(st, surpriseMemberTimeout, surpriseMemberTimeout/3);
 +      } // ds != null && ds.isConnected()
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +  }
 +  /**
 +   * Dispatch the distribution message, or place it on the startup queue.
 +   * 
 +   * @param msg the message to process
 +   */
 +  protected void handleOrDeferMessage(DistributionMessage msg) {
 +    synchronized(startupLock) {
 +      if (!processingEvents) {
 +        startupMessages.add(new StartupEvent(msg));
 +        return;
 +      }
 +    }
 +    dispatchMessage(msg);
 +  }
 +  
 +  public void warnShun(DistributedMember m) {
 +    latestViewLock.writeLock().lock();
 +    try {
 +      if (!shunnedMembers.containsKey(m)) {
 +        return; // not shunned
 +      }
 +      if (shunnedAndWarnedMembers.contains(m)) {
 +        return; // already warned
 +      }
 +      shunnedAndWarnedMembers.add(m);
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +    // issue warning outside of sync since it may cause messaging and we don't
 +    // want to hold the view lock while doing that
 +    logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_DISREGARDING_SHUNNED_MEMBER_0, m));
 +  }
 +  
 +  @Override
 +  public void processMessage(DistributionMessage msg) {
 +    handleOrDeferMessage(msg);
 +  }
 +  
 +  /**
 +   * Logic for processing a distribution message.  
 +   * <p>
 +   * It is possible to receive messages not consistent with our view.
 +   * We handle this here, and generate an uplevel event if necessary
 +   * @param msg the message
 +   */
 +  public void dispatchMessage(DistributionMessage msg) {
 +    boolean isNew = false;
 +    InternalDistributedMember m = msg.getSender();
 +    boolean shunned = false;
 +
 +    // First grab the lock: check the sender against our stabilized view.
 +    latestViewLock.writeLock().lock();
 +    try {
 +      if (isShunned(m)) {
 +        if (msg instanceof StartupMessage) {
 +          endShun(m);
 +        }
 +        else {
 +          // fix for bug 41538 - sick alert listener causes deadlock
 +          // due to view lock being held during messaging
 +          shunned = true;
 +        }
 +      } // isShunned
 +
 +      if (!shunned) {
 +        isNew = !latestView.contains(m) && !surpriseMembers.containsKey(m);
 +
 +        // If it's a new sender, wait our turn, generate the event
 +        if (isNew) {
 +          shunned = !addSurpriseMember(m);
 +        } // isNew
 +      }
 +
 +      // Latch the view before we unlock
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +    
 +    if (shunned) { // bug #41538 - shun notification must be outside synchronization to avoid hanging
 +      warnShun(m);
 +      logger.info("Membership: Ignoring message from shunned member <{}>:{}", m, msg);
 +      throw new MemberShunnedException(m);
 +    }
 +    
 +    listener.messageReceived(msg);
 +  }
 +
 +  /**
 +   * Process a new view object, or place on the startup queue
 +   * @param viewArg the new view
 +   */
 +  protected void handleOrDeferViewEvent(NetView viewArg) {
 +    if (this.isJoining) {
 +      // bug #44373 - queue all view messages while joining.
 +      // This is done under the latestViewLock, but we can't block here because
 +      // we're sitting in the UDP reader thread.
 +      synchronized(startupLock) {
 +        startupMessages.add(new StartupEvent(viewArg));
 +        return;
 +      }
 +    }
 +    latestViewLock.writeLock().lock();
 +    try {
 +      synchronized(startupLock) {
 +        if (!processingEvents) {
 +          startupMessages.add(new StartupEvent(viewArg));
 +          return;
 +        }
 +      }
 +      // view processing can take a while, so we use a separate thread
 +      // to avoid blocking a reader thread
 +      NetView newView = viewArg;
 +      long newId = viewArg.getViewId();
 +      LocalViewMessage v = new LocalViewMessage(address, newId, newView,
 +          GMSMembershipManager.this);
 +
 +      listener.messageReceived(v);
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +  }
 +  
 +  @Override
 +  public void memberSuspected(InternalDistributedMember initiator, InternalDistributedMember suspect, String reason) {
 +    SuspectMember s = new SuspectMember(initiator, suspect, reason);
 +    handleOrDeferSuspect(s);
 +  }
 +
 +  /**
 +   * Process a new view object, or place on the startup queue
 +   * @param suspectInfo the suspectee and suspector
 +   */
 +  protected void handleOrDeferSuspect(SuspectMember suspectInfo) {
 +    latestViewLock.writeLock().lock();
 +    try {
 +      synchronized(startupLock) {
 +        if (!processingEvents) {
 +          return;
 +        }
 +      }
 +      InternalDistributedMember suspect = suspectInfo.suspectedMember;
 +      InternalDistributedMember who = suspectInfo.whoSuspected;
 +      this.suspectedMembers.put(suspect, Long.valueOf(System.currentTimeMillis()));
 +      try {
 +        listener.memberSuspect(suspect, who, suspectInfo.reason);
 +      }
 +      catch (DistributedSystemDisconnectedException se) {
 +        // let's not get huffy about it
 +      }
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * Process a potential direct connect.  Does not use
 +   * the startup queue.  It grabs the {@link #latestViewLock} 
 +   * and then processes the event.
 +   * <p>
 +   * It is a <em>potential</em> event, because we don't know until we've
 +   * grabbed a stable view if this is really a new member.
 +   * 
 +   * @param member
 +   */
 +  private void processSurpriseConnect(
 +      InternalDistributedMember member) 
 +  {
 +    addSurpriseMember(member);
 +  }
 +  
 +  /**
 +   * Dispatch routine for processing a single startup event
 +   * @param o the startup event to handle
 +   */
 +  private void processStartupEvent(StartupEvent o) {
 +    // Most common events first
 +    
 +    if (o.isDistributionMessage()) { // normal message
 +      try {
 +        dispatchMessage(o.dmsg);
 +      }
 +      catch (MemberShunnedException e) {
 +        // message from non-member - ignore
 +      }
 +    } 
 +    else if (o.isGmsView()) { // view event
 +      processView(o.gmsView.getViewId(), o.gmsView);
 +    }
 +    else if (o.isSurpriseConnect()) { // connect
 +      processSurpriseConnect(o.member);
 +    }
 +    
 +    else // sanity
 +      throw new InternalGemFireError(LocalizedStrings.GroupMembershipService_UNKNOWN_STARTUP_EVENT_0.toLocalizedString(o));
 +  }
 +  
 +  /**
 +   * Special mutex to create a critical section for
 +   * {@link #startEventProcessing()}
 +   */
 +  private final Object startupMutex = new Object();
 +
 +  
 +  public void startEventProcessing()
 +  {
 +    // Only allow one thread to perform the work
 +    synchronized (startupMutex) {
 +      if (logger.isDebugEnabled())
 +        logger.debug("Membership: draining startup events.");
 +      // Remove the backqueue of messages, but allow
 +      // additional messages to be added.
 +      for (;;) {
 +        StartupEvent ev;
 +        // Only grab the mutex while reading the queue.
 +        // Other events may arrive while we're attempting to
 +        // drain the queue.  This is OK, we'll just keep processing
 +        // events here until we've caught up.
 +        synchronized (startupLock) {
 +          int remaining = startupMessages.size();
 +          if (remaining == 0) {
 +            // While holding the lock, flip the bit so that
 +            // no more events get put into startupMessages, and
 +            // notify all waiters to proceed.
 +            processingEvents = true;
 +            startupLock.notifyAll();
 +            break;  // ...and we're done.
 +          }
 +          if (logger.isDebugEnabled()) {
 +            logger.debug("Membership: {} remaining startup message(s)", remaining);
 +          }
 +          ev = (StartupEvent)startupMessages.removeFirst();
 +        } // startupLock
 +        try {
 +          processStartupEvent(ev);
 +        }
 +        catch (VirtualMachineError err) {
 +          SystemFailure.initiateFailure(err);
 +          // If this ever returns, rethrow the error.  We're poisoned
 +          // now, so don't let this thread continue.
 +          throw err;
 +        }
 +        catch (Throwable t) {
 +          // Whenever you catch Error or Throwable, you must also
 +          // catch VirtualMachineError (see above).  However, there is
 +          // _still_ a possibility that you are dealing with a cascading
 +          // error condition, so you also need to check to see if the JVM
 +          // is still usable:
 +          SystemFailure.checkFailure();
 +          logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_MEMBERSHIP_ERROR_HANDLING_STARTUP_EVENT), t);
 +        }
 +        
 +      } // for
 +      if (logger.isDebugEnabled())
 +        logger.debug("Membership: finished processing startup events.");
 +    } // startupMutex
 +  }
 +
 + 
 +  public void waitForEventProcessing() throws InterruptedException {
 +    // First check outside of a synchronized block.  Cheaper and sufficient.
 +    if (Thread.interrupted()) throw new InterruptedException();
 +    if (processingEvents)
 +      return;
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("Membership: waiting until the system is ready for events");
 +    }
 +    for (;;) {
 +      directChannel.getCancelCriterion().checkCancelInProgress(null);
 +      synchronized (startupLock) {
 +        // Now check using a memory fence and synchronization.
 +        if (processingEvents)
 +          break;
 +        boolean interrupted = Thread.interrupted();
 +        try {
 +          startupLock.wait();
 +        }
 +        catch (InterruptedException e) {
 +          interrupted = true;
 +          directChannel.getCancelCriterion().checkCancelInProgress(e);
 +        }
 +        finally {
 +          if (interrupted) {
 +            Thread.currentThread().interrupt();
 +          }
 +        }
 +      } // synchronized
 +    } // for
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("Membership: continuing");
 +    }
 +  }
 +
 +  /**
 +   * for testing we need to validate the startup event list
 +   */
 +  public List<StartupEvent> getStartupEvents() {
 +    return this.startupMessages;
 +  }
 +
 +  public ReadWriteLock getViewLock() {
 +    return this.latestViewLock;
 +  }
 +
 +  /**
 +   * Returns a copy (possibly not current) of the current
 +   * view (a list of {@link DistributedMember}s)
 +   */
 +  public NetView getView()
 +  {
 +    // Grab the latest view under a mutex...
 +    NetView v;
 +
 +    latestViewLock.readLock().lock();
 +    v = latestView;
 +    latestViewLock.readLock().unlock();
 +
 +    NetView result = new NetView(v, v.getViewId());
 +    
 +    for (InternalDistributedMember m: v.getMembers()) {
 +      if (isShunned(m)) {
 +        result.remove(m);
 +      }
 +    }
 +    
 +    return result;
 +  }
 +  
 +  /**
 +   * test hook<p>
 +   * The lead member is the eldest member with partition detection enabled.<p>
 +   * If no members have partition detection enabled, there will be no
 +   * lead member and this method will return null.
 +   * @return the lead member associated with the latest view
 +   */
 +  public DistributedMember getLeadMember() {
 +    latestViewLock.readLock().lock();
 +    try {
 +      return latestView == null? null : latestView.getLeadMember();
 +    } finally {
 +      latestViewLock.readLock().unlock();
 +    }
 +  }
 +  
 +  protected boolean isJoining() {
 +    return this.isJoining;
 +  }
 +  
 +  /**
 +   * test hook
 +   * @return the current membership view coordinator
 +   */
 +  public DistributedMember getCoordinator() {
 +    // note - we go straight to JoinLeave because the
 +    // DistributionManager queues view changes in a serial executor, where
 +    // they're asynchronously installed.  The DS may still see the old coordinator
 +    latestViewLock.readLock().lock();
 +    try {
 +      return latestView == null? null : latestView.getCoordinator();
 +    } finally {
 +      latestViewLock.readLock().unlock();
 +    }
 +  }
 +
 +  public boolean memberExists(DistributedMember m) {
 +    latestViewLock.readLock().lock();
 +    NetView v = latestView;
 +    latestViewLock.readLock().unlock();
 +    return v.getMembers().contains(m);
 +  }
 +  
 +  /**
 +   * Returns the identity associated with this member. WARNING: this value will
 +   * be returned after the channel is closed, but in that case it is good for
 +   * logging purposes only. :-)
 +   */
 +  public InternalDistributedMember getLocalMember()
 +  {
 +    return address;
 +  }
 +  
 +  public Services getServices() {
 +    return services;
 +  }
 +
 +  public void postConnect() {
 +  }
 +  
 +  /**
 +   * @see SystemFailure#loadEmergencyClasses()
 +   /**
 +   * break any potential circularity in {@link #loadEmergencyClasses()}
 +   */
 +  private static volatile boolean emergencyClassesLoaded = false;
 +
 +  /**
 +   * inhibits logging of ForcedDisconnectException to keep dunit logs clean
 +   * while testing this feature
 +   */
 +  protected static volatile boolean inhibitForceDisconnectLogging;
 +  
 +  /**
 +   * Ensure that the critical classes from components
 +   * get loaded.
 +   * 
 +   * @see SystemFailure#loadEmergencyClasses()
 +   */
 +  public static void loadEmergencyClasses() {
 +    if (emergencyClassesLoaded) return;
 +    emergencyClassesLoaded = true;
 +    DirectChannel.loadEmergencyClasses();
 +    GMSJoinLeave.loadEmergencyClasses();
 +    GMSHealthMonitor.loadEmergencyClasses();
 +  }
 +  /**
 +   * Close the receiver, avoiding all potential deadlocks and
 +   * eschewing any attempts at being graceful.
 +   * 
 +   * @see SystemFailure#emergencyClose()
 +   */
 +  public void emergencyClose() {
 +    final boolean DEBUG = SystemFailure.TRACE_CLOSE;
 +    
 +    setShutdown(); 
 +
 +    // We can't call close() because they will allocate objects.  Attempt
 +    // a surgical strike and touch the important protocols.
 +    
 +    // MOST important, kill the FD protocols...
 +    services.emergencyClose();
 +    
 +    // Close the TCPConduit sockets...
 +    if (directChannel != null) {
 +      if (DEBUG) {
 +        System.err.println("DEBUG: emergency close of DirectChannel");
 +      }
 +      directChannel.emergencyClose();
 +    }
 +    
 +    if (DEBUG) {
 +      System.err.println("DEBUG: done closing GroupMembershipService");
 +    }
 +  }
 +  
 +  
 +  /**
 +   * in order to avoid split-brain occurring when a member is shutting down due to
 +   * race conditions in view management we add it as a shutdown member when we receive
 +   * a shutdown message.  This is not the same as a SHUNNED member.
 +   */
 +  public void shutdownMessageReceived(InternalDistributedMember id, String reason) {
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("Membership: recording shutdown status of {}", id);
 +    }
 +    synchronized(this.shutdownMembers) { 
 +      this.shutdownMembers.put(id, id);
 +      services.getHealthMonitor().memberShutdown(id, reason);
 +      services.getJoinLeave().memberShutdown(id, reason);
 +    }
 +  }
 +  
 +  /**
 +   * returns true if a shutdown message has been received from the given address but
 +   * that member is still in the membership view or is a surprise member.
 +   */
 +  public boolean isShuttingDown(InternalDistributedMember mbr) {
 +    synchronized(shutdownMembers) {
 +      return shutdownMembers.containsKey(mbr);
 +    }
 +  }
 +
 +  
 +  public void shutdown() {
 +    setShutdown();
 +    services.stop();
 +  }
 +  
 +  @Override
 +  public void stop() {
 +    
 +    // [bruce] Do not null out the channel w/o adding appropriate synchronization
 +    
 +    logger.debug("MembershipManager closing");
 +    
 +    if (directChannel != null) {
 +      directChannel.disconnect(null);
 +
 +      if (address != null) {
 +        // Make sure that channel information is consistent
 +        // Probably not important in this particular case, but just
 +        // to be consistent...
 +        latestViewLock.writeLock().lock();
 +        try {
 +          destroyMember(address, false, "orderly shutdown");
 +        } finally {
 +          latestViewLock.writeLock().unlock();
 +        }
 +      }
 +    }
 +    
 +    if (cleanupTimer != null) {
 +      cleanupTimer.cancel();
 +    }
 +    
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("Membership: channel closed");
 +    }
 +  }
 +  
 +  public void uncleanShutdown(String reason, final Exception e) {
 +    inhibitForcedDisconnectLogging(false);
 +    
 +    if (services.getShutdownCause() == null) {
 +      services.setShutdownCause(e);
 +    }
 +    
 +    if (this.directChannel != null) {
 +      this.directChannel.disconnect(e);
 +    }
 +    
 +    // first shut down communication so we don't do any more harm to other
 +    // members
 +    services.emergencyClose();
 +    
 +    if (e != null) {
 +      try {
 +        if (membershipTestHooks != null) {
 +          List l = membershipTestHooks;
 +          for (Iterator it=l.iterator(); it.hasNext(); ) {
 +            MembershipTestHook dml = (MembershipTestHook)it.next();
 +            dml.beforeMembershipFailure(reason, e);
 +          }
 +        }
 +        listener.membershipFailure(reason, e);
 +        if (membershipTestHooks != null) {
 +          List l = membershipTestHooks;
 +          for (Iterator it=l.iterator(); it.hasNext(); ) {
 +            MembershipTestHook dml = (MembershipTestHook)it.next();
 +            dml.afterMembershipFailure(reason, e);
 +          }
 +        }
 +      }
 +      catch (RuntimeException re) {
 +        logger.warn(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_EXCEPTION_CAUGHT_WHILE_SHUTTING_DOWN), re);
 +      }
 +    }
 +  }
 +  
 +  /** generate XML for the cache before shutting down due to forced disconnect */
 +  public void saveCacheXmlForReconnect() {
 +    // there are two versions of this method so it can be unit-tested
 +    boolean sharedConfigEnabled = services.getConfig().getDistributionConfig().getUseSharedConfiguration();
 +    saveCacheXmlForReconnect(sharedConfigEnabled);
 +  }
 +  
 +  /** generate XML from the cache before shutting down due to forced disconnect */
 +  public void saveCacheXmlForReconnect(boolean sharedConfigEnabled) {
 +    // first save the current cache description so reconnect can rebuild the cache
 +    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
 +    if (cache != null && (cache instanceof Cache)) {
 +      if (!Boolean.getBoolean("gemfire.autoReconnect-useCacheXMLFile")
 +          && !cache.isSqlfSystem() && !sharedConfigEnabled) {
 +        try {
 +          logger.info("generating XML to rebuild the cache after reconnect completes");
 +          StringPrintWriter pw = new StringPrintWriter(); 
 +          CacheXmlGenerator.generate((Cache)cache, pw, true, false);
 +          String cacheXML = pw.toString();
 +          cache.getCacheConfig().setCacheXMLDescription(cacheXML);
 +          logger.info("XML generation completed: {}", cacheXML);
 +        } catch (CancelException e) {
 +          logger.info(LocalizedMessage.create(LocalizedStrings.GroupMembershipService_PROBLEM_GENERATING_CACHE_XML), e);
 +        }
 +      } else if (sharedConfigEnabled && !cache.getCacheServers().isEmpty()) {
 +        // we need to retain a cache-server description if this JVM was started by gfsh
 +        List<CacheServerCreation> list = new ArrayList<CacheServerCreation>(cache.getCacheServers().size());
 +        for (Iterator it = cache.getCacheServers().iterator(); it.hasNext(); ) {
-           CacheServer cs = (CacheServer)it.next();
-           CacheServerCreation bsc = new CacheServerCreation(cache, cs);
-           list.add(bsc);
++          CacheServerImpl cs = (CacheServerImpl)it.next();
++          if (cs.isDefaultServer()) {
++            CacheServerCreation bsc = new CacheServerCreation(cache, cs);
++            list.add(bsc);
++          }
 +        }
 +        cache.getCacheConfig().setCacheServerCreation(list);
 +        logger.info("CacheServer configuration saved");
 +      }
 +    }
 +  }
 +
 +  public boolean requestMemberRemoval(DistributedMember mbr, String reason) {
 +    if (mbr.equals(this.address)) {
 +      return false;
 +    }
 +    logger.warn(LocalizedMessage.create(
 +        LocalizedStrings.GroupMembershipService_MEMBERSHIP_REQUESTING_REMOVAL_OF_0_REASON_1,
 +        new Object[] {mbr, reason}));
 +    try {
 +      services.getJoinLeave().remove((InternalDistributedMember)mbr, reason);
 +    }
 +    catch (RuntimeException e) {
 +      Throwable problem = e;
 +      if (services.getShutdownCause() != null) {
 +        Throwable cause = services.getShutdownCause();
 +        // If ForcedDisconnectException occurred then report it as actual
 +        // problem.
 +        if (cause instanceof ForcedDisconnectException) {
 +          problem = (Exception) cause;
 +        } else {
 +          Throwable ne = problem;
 +          while (ne.getCause() != null) {
 +            ne = ne.getCause();
 +          }
 +          try {
 +            ne.initCause(services.getShutdownCause());
 +          }
 +          catch (IllegalArgumentException selfCausation) {
 +            // fix for bug 38895 - the cause is already in place
 +          }
 +        }
 +      }
 +      if (!services.getConfig().getDistributionConfig().getDisableAutoReconnect()) {
 +        saveCacheXmlForReconnect();
 +      }
 +      listener.membershipFailure("Channel closed", problem);
 +      throw new DistributedSystemDisconnectedException("Channel closed", problem);
 +    }
 +    return true;
 +  }
 +  
 +  public void suspectMembers(Set members, String reason) {
 +    for (Iterator it=members.iterator(); it.hasNext(); ) {
 +      verifyMember((DistributedMember)it.next(), reason);
 +    }
 +  }
 +  
 +  public void suspectMember(DistributedMember mbr, String reason) {
 +    if (!this.shutdownInProgress && !this.shutdownMembers.containsKey(mbr)) {
 +      verifyMember(mbr, reason);
 +    }
 +  }
 +
 +  /* like memberExists() this checks to see if the given ID is in the current
 +   * membership view.  If it is in the view though we try to contact it
 +   * to see if it's still around.  If we can't contact it then
 +   * suspect messages are sent to initiate final checks
 +   * @param mbr the member to verify
 +   * @param reason why the check is being done (must not be blank/null)
 +   * @return true if the member checks out
 +   */
 +  public boolean verifyMember(DistributedMember mbr, String reason) {
 +    if (mbr != null && memberExists((InternalDistributedMember)mbr)) {
 +      return this.services.getHealthMonitor().checkIfAvailable(mbr, reason, true);
 +    }
 +    return false;
 +  }
 +
 +  /**
 +   * Perform the grossness associated with sending a message over
 +   * a DirectChannel
 +   * 
 +   * @param destinations the list of destinations
 +   * @param content the message
 +   * @param theStats the statistics object to update
 +   * @return all recipients who did not receive the message (null if
 +   * all received it)
 +   * @throws NotSerializableException if the message is not serializable
 +   */
 +  protected Set<InternalDistributedMember> directChannelSend(InternalDistributedMember[] destinations,
 +      DistributionMessage content,
 +      DMStats theStats)
 +      throws NotSerializableException
 +  {
 +    boolean allDestinations;
 +    InternalDistributedMember[] keys;
 +    if (content.forAll()) {
 +      allDestinations = true;
 +      latestViewLock.writeLock().lock();
 +      try {
 +        List<InternalDistributedMember> keySet = latestView.getMembers();
 +        keys = new InternalDistributedMember[keySet.size()];
 +        keys = (InternalDistributedMember[])keySet.toArray(keys);
 +      } finally {
 +        latestViewLock.writeLock().unlock();
 +      }
 +    }
 +    else {
 +      allDestinations = false;
 +      keys = destinations;
 +    }
 +
 +    int sentBytes = 0;
 +    try {
 +      sentBytes = directChannel.send(this, keys, content,
 +          this.services.getConfig().getDistributionConfig().getAckWaitThreshold(),
 +          this.services.getConfig().getDistributionConfig().getAckSevereAlertThreshold());
 +                                     
 +      if (theStats != null) {
 +        theStats.incSentBytes(sentBytes);
 +      }
 +      
 +      if (sentBytes == 0) {
 +        if (services.getCancelCriterion().cancelInProgress() != null) {
 +          throw new DistributedSystemDisconnectedException();
 +        }
 +      }
 +    }
 +    catch (DistributedSystemDisconnectedException ex) {
 +      if (services.getShutdownCause() != null) {
 +        throw new DistributedSystemDisconnectedException("DistributedSystem is shutting down", services.getShutdownCause());
 +      } else {
 +        throw ex; // see bug 41416
 +      }
 +    }
 +    catch (ConnectExceptions ex) {
 +      if (allDestinations)
 +        return null;
 +      
 +      List members = ex.getMembers(); // We need to return this list of failures
 +      
 +      // SANITY CHECK:  If we fail to send a message to an existing member 
 +      // of the view, we have a serious error (bug36202).
 +      NetView view = services.getJoinLeave().getView(); // grab a recent view, excluding shunned members
 +      
 +      // Iterate through members and causes in tandem :-(
 +      Iterator it_mem = members.iterator();
 +      Iterator it_causes = ex.getCauses().iterator();
 +      while (it_mem.hasNext()) {
 +        InternalDistributedMember member = (InternalDistributedMember)it_mem.next();
 +        Throwable th = (Throwable)it_causes.next();
 +        
 +        if (!view.contains(member) || (th instanceof ShunnedMemberException)) {
 +          continue;
 +        }
 +        logger.fatal(LocalizedMessage.create(
 +            LocalizedStrings.GroupMembershipService_FAILED_TO_SEND_MESSAGE_0_TO_MEMBER_1_VIEW_2,
 +            new Object[] {content, member, view}), th);
 +//        Assert.assertTrue(false, "messaging contract failure");
 +      }
 +      return new HashSet<InternalDistributedMember>(members);
 +    } // catch ConnectionExceptions
 +    catch (ToDataException | CancelException e) {
 +      throw e;
 +    }
 +    catch (IOException e) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
 +      }
 +      if (e instanceof NotSerializableException) {
 +        throw (NotSerializableException)e;
 +      }
 +    }
 +    catch (RuntimeException e) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
 +      }
 +      throw e;
 +    }
 +    catch (Error e) {
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Membership: directChannelSend caught exception: {}", e.getMessage(), e);
 +      }
 +      throw e;
 +    }
 +    return null;
 +  }
 +
 +  /*
 +   * (non-Javadoc)
 +   * @see com.gemstone.gemfire.distributed.internal.membership.MembershipManager#isConnected()
 +   */
 +  public boolean isConnected() {
 +    return (this.hasJoined && !this.shutdownInProgress); 
 +  }
 +  
 +  /**
 +   * Returns true if the distributed system is in the process of auto-reconnecting.
 +   * Otherwise returns false.
 +   */
 +  public boolean isReconnectingDS() {
 +    if (this.hasJoined) {
 +      return false;
 +    } else {
 +      return this.wasReconnectingSystem;
 +    }
 +  }
 +  
 +  @Override
 +  public QuorumChecker getQuorumChecker() {
 +    if ( ! (services.isShutdownDueToForcedDisconnect()) ) {
 +      return null;
 +    }
 +    if (this.quorumChecker != null) {
 +      return this.quorumChecker;
 +    }
 +
 +    QuorumChecker impl = services.getMessenger().getQuorumChecker();
 +    this.quorumChecker = impl;
 +    return impl;
 +  }
 +  
 +  @Override
 +  public void releaseQuorumChecker(QuorumChecker checker) {
 +    ((GMSQuorumChecker)checker).suspend();
 +    InternalDistributedSystem system = InternalDistributedSystem.getAnyInstance();
 +    if (system == null || !system.isConnected()) {
 +      checker.close();
 +    }
 +  }
 +  
 +  public Set send(InternalDistributedMember dest, DistributionMessage msg)
 +    throws NotSerializableException {
 +    
 +    InternalDistributedMember dests[] = new InternalDistributedMember[] { dest };
 +    return send (dests, msg, null);
 +  }
 +  
 +  public Set send(InternalDistributedMember[] destinations,
 +      DistributionMessage msg,
 +      DMStats theStats)
 +      throws NotSerializableException
 +  {
 +    Set result = null;
 +    boolean allDestinations = msg.forAll();
 +    
 +    if (services.getCancelCriterion().cancelInProgress() != null) {
 +      throw new DistributedSystemDisconnectedException("Distributed System is shutting down",
 +          services.getCancelCriterion().generateCancelledException(null));
 +    }
 +    
 +    if (playingDead) { // wellness test hook
 +      while (playingDead && !shutdownInProgress) {
 +        try {
 +          Thread.sleep(1000);
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +        }
 +      }
 +    }
 +    
 +    if (isJoining()) {
 +      // If we get here, we are starting up, so just report a failure.
 +      if (allDestinations)
 +        return null;
 +      else {
 +        result = new HashSet();
 +        for (int i = 0; i < destinations.length; i ++)
 +          result.add(destinations[i]);
 +        return result;
 +      }
 +    }
 +    
 +    if (msg instanceof AdminMessageType
 +        && this.shutdownInProgress) {
 +      // no admin messages while shutting down - this can cause threads to hang
 +      return new HashSet(Arrays.asList(msg.getRecipients()));
 +    }
 +
 +    // Handle trivial cases
 +    if (destinations == null) {
 +      if (logger.isTraceEnabled())
 +        logger.trace("Membership: Message send: returning early because null set passed in: '{}'", msg);
 +      return null; // trivially: all recipients received the message
 +    }
 +    if (destinations.length == 0) {
 +      if (logger.isTraceEnabled())
 +        logger.trace("Membership: Message send: returning early because empty destination list passed in: '{}'", msg);
 +      return null; // trivially: all recipients received the message
 +    }
 +
 +    msg.setSender(address);
 +    
 +    msg.setBreadcrumbsInSender();
 +    Breadcrumbs.setProblem(null);
 +
 +    boolean useMcast = false;
 +    if (mcastEnabled) {
 +      useMcast = (msg.getMulticast() || allDestinations);
 +    }
 +    
 +    boolean sendViaMessenger = isForceUDPCommunications(); // enable when bug #46438 is fixed: || msg.sendViaUDP();
 +
 +    if (useMcast || tcpDisabled || sendViaMessenger) {
 +      checkAddressesForUUIDs(destinations);
 +      result = services.getMessenger().send(msg);
 +    }
 +    else {
 +      result = directChannelSend(destinations, msg, theStats);
 +    }
 +
 +    // If the message was a broadcast, don't enumerate failures.
 +    if (allDestinations)
 +      return null;
 +    else {
 +      return result;
 +    }
 +  }
 +  
 +  // MembershipManager method
 +  @Override
 +  public void forceUDPMessagingForCurrentThread() {
 +    forceUseUDPMessaging.set(null);
 +  }
 +  
 +  void checkAddressesForUUIDs(InternalDistributedMember[] addresses) {
 +    for (int i=0; i<addresses.length; i++) {
 +      InternalDistributedMember m = addresses[i];
 +      if(m != null) {
 +        GMSMember id = (GMSMember)m.getNetMember();
 +        if (!id.hasUUID()) {
 +          latestViewLock.readLock().lock();
 +          try {
 +            addresses[i] = latestView.getCanonicalID(addresses[i]);
 +          } finally {
 +            latestViewLock.readLock().unlock();
 +          }
 +        }
 +      }
 +    }
 +  }
 +  
 +  private boolean isForceUDPCommunications() {
 +    Boolean forced = forceUseUDPMessaging.get();
 +    return forced == Boolean.TRUE;
 +  }
 +
 +  // MembershipManager method
 +  @Override
 +  public void releaseUDPMessagingForCurrentThread() {
 +    // not currently supported by this manager
 +  }
 +  
 +  public void setShutdown()
 +  {
 +    latestViewLock.writeLock().lock();
 +    shutdownInProgress = true;
 +    latestViewLock.writeLock().unlock();
 +  }
 +
 +  @Override
 +  public boolean shutdownInProgress() {
 +    // Impossible condition (bug36329): make sure that we check DM's
 +    // view of shutdown here
 +    DistributionManager dm = listener.getDM();
 +    return shutdownInProgress || (dm != null && dm.shutdownInProgress());
 +  }
 +  
 +
 +  /**
 +   * Clean up and create consistent new view with member removed.
 +   * No uplevel events are generated.
 +   * 
 +   * Must be called with the {@link #latestViewLock} held.
 +   */
 +  protected void destroyMember(final InternalDistributedMember member,
 +      boolean crashed, final String reason) {
 +    
 +    // Make sure it is removed from the view
 +    latestViewLock.writeLock().lock();
 +    try {
 +      if (latestView.contains(member)) {
 +        NetView newView = new NetView(latestView, latestView.getViewId());
 +        newView.remove(member);
 +        latestView = newView;
 +      }
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +    
 +    surpriseMembers.remove(member);
 +    
 +    // Trickiness: there is a minor recursion
 +    // with addShunnedMembers, since it will
 +    // attempt to destroy really really old members.  Performing the check
 +    // here breaks the recursion.
 +    if (!isShunned(member)) {
 +      addShunnedMember(member);
 +    }
 +
 +    final DirectChannel dc = directChannel;
 +    if (dc != null) {
 +//      if (crashed) {
 +//        dc.closeEndpoint(member, reason);
 +//      }
 +//      else
 +      // Bug 37944: make sure this is always done in a separate thread,
 +      // so that shutdown conditions don't wedge the view lock
 +      { // fix for bug 34010
 +        Thread t = new Thread() {
 +          @Override
 +          public void run() {
 +            try {
 +              Thread.sleep(
 +                  Integer.getInteger("p2p.disconnectDelay", 3000).intValue());
 +            }
 +            catch (InterruptedException ie) {
 +              Thread.currentThread().interrupt();
 +              // Keep going, try to close the endpoint.
 +            }
 +            if (!dc.isOpen()) {
 +              return;
 +            }
 +            if (logger.isDebugEnabled())
 +              logger.debug("Membership: closing connections for departed member {}", member);
 +            // close connections, but don't do membership notification since it's already been done
 +            dc.closeEndpoint(member, reason, false); 
 +          }
 +        };
 +        t.setDaemon(true);
 +        t.setName("disconnect thread for " + member);
 +        t.start();
 +      } // fix for bug 34010
 +    }
 +  }
 +  
 +  /**
 +   * Indicate whether the given member is in the zombie list (dead or dying)
 +   * @param m the member in question
 +   * 
 +   * This also checks the time the given member was shunned, and
 +   * has the side effect of removing the member from the
 +   * list if it was shunned too far in the past.
 +   * 
 +   * Concurrency: protected by {@link #latestViewLock} ReentrantReadWriteLock
 +   * 
 +   * @return true if the given member is a zombie
 +   */
 +  public boolean isShunned(DistributedMember m) {
 +    latestViewLock.writeLock().lock();
 +    try {
 +      if (!shunnedMembers.containsKey(m))
 +        return false;
 +      
 +      // Make sure that the entry isn't stale...
 +      long shunTime = ((Long)shunnedMembers.get(m)).longValue();
 +      long now = System.currentTimeMillis();
 +      if (shunTime + SHUNNED_SUNSET * 1000 > now)
 +        return true;
 +      
 +      // Oh, it _is_ stale.  Remove it while we're here.
 +      endShun(m);
 +      return false;
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * Indicate whether the given member is in the surprise member list
 +   * <P>
 +   * Unlike isShunned, this method will not cause expiry of a surprise member.
 +   * That must be done during view processing.
 +   * <p>
 +   * Like isShunned, this method holds the view lock while executing
 +   * 
 +   * Concurrency: protected by {@link #latestViewLock} ReentrantReadWriteLock
 +   * 
 +   * @param m the member in question
 +   * @return true if the given member is a surprise member
 +   */
 +  public boolean isSurpriseMember(DistributedMember m) {
 +    latestViewLock.readLock().lock();
 +    try  {
 +      if (surpriseMembers.containsKey(m)) {
 +        long birthTime = ((Long)surpriseMembers.get(m)).longValue();
 +        long now = System.currentTimeMillis();
 +        return (birthTime >= (now - this.surpriseMemberTimeout));
 +      }
 +      return false;
 +    } finally {
 +      latestViewLock.readLock().unlock();
 +    }
 +  }
 +  
 +  /**
 +   * for testing we need to be able to inject surprise members into
 +   * the view to ensure that sunsetting works properly
 +   * @param m the member ID to add
 +   * @param birthTime the millisecond clock time that the member was first seen
 +   */
 +  public void addSurpriseMemberForTesting(DistributedMember m, long birthTime) {
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("test hook is adding surprise member {} birthTime={}", m, birthTime);
 +    }
 +    latestViewLock.writeLock().lock();
 +    try {
 +      surpriseMembers.put((InternalDistributedMember)m, Long.valueOf(birthTime));
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +  }
 +  
 +  /**
 +   * returns the surpriseMemberTimeout interval, in milliseconds
 +   */
 +  public int getSurpriseMemberTimeout() {
 +    return this.surpriseMemberTimeout;
 +  }
 +  
 +  private boolean endShun(DistributedMember m) {
 +    boolean wasShunned = (shunnedMembers.remove(m) != null);
 +    shunnedAndWarnedMembers.remove(m);
 +    return wasShunned;
 +  }
 +  
 + /**
 +   * Add the given member to the shunned list.  Also, purge any shunned
 +   * members that are really really old.
 +   * <p>
 +   * Must be called with {@link #latestViewLock} held and
 +   * the view stable.
 +   * 
 +   * @param m the member to add
 +   */
 +  protected void addShunnedMember(InternalDistributedMember m) {
 +    long deathTime = System.currentTimeMillis() - SHUNNED_SUNSET * 1000;
 +    
 +    surpriseMembers.remove(m); // for safety
 +
 +    // Update the shunned set.
 +    if (!isShunned(m)) {
 +      shunnedMembers.put(m, Long.valueOf(System.currentTimeMillis()));
 +    }
 +
 +    // Remove really really old shunned members.
 +    // First, make a copy of the old set.  New arrivals _a priori_ don't matter,
 +    // and we're going to be updating the list so we don't want to disturb
 +    // the iterator.
 +    Set oldMembers = new HashSet(shunnedMembers.entrySet());
 +    
 +    Set removedMembers = new HashSet();
 +    
 +    Iterator it = oldMembers.iterator();
 +    while (it.hasNext()) {
 +      Map.Entry e = (Map.Entry)it.next();
 +      
 +      // Key is the member.  Value is the time to remove it.
 +      long ll = ((Long)e.getValue()).longValue(); 
 +      if (ll >= deathTime) {
 +        continue; // too new.
 +      }
 +      
 +      InternalDistributedMember mm = (InternalDistributedMember)e.getKey();
 +
 +      if (latestView.contains(mm)) {
 +        // Fault tolerance: a shunned member can conceivably linger but never 
 +        // disconnect.
 +        //
 +        // We may not delete it at the time that we shun it because the view 
 +        // isn't necessarily stable.  (Note that a well-behaved cache member
 +        // will depart on its own accord, but we force the issue here.)
 +        destroyMember(mm, true, "shunned but never disconnected");
 +      }
 +      if (logger.isDebugEnabled()) {
 +        logger.debug("Membership: finally removed shunned member entry <{}>", mm);
 +      }
 +      
 +      removedMembers.add(mm);
 +    }
 +    
 +    // removed timed-out entries from the shunned-members collections
 +    if (removedMembers.size() > 0) {
 +      it = removedMembers.iterator();
 +      while (it.hasNext()) {
 +        InternalDistributedMember idm = (InternalDistributedMember)it.next();
 +        endShun(idm);
 +      }
 +    }
 +  }
 +  
 +  
 +  /**
 +   * Retrieve thread-local data for transport to another thread in hydra
 +   */
 +  public Object getThreadLocalData() {
 +    Map result = new HashMap();
 +    return result;
 +  }
 +  
 +  /**
 +   * for testing verification purposes, this return the port for the
 +   * direct channel, or zero if there is no direct
 +   * channel
 +   */
 +  public int getDirectChannelPort() {
 +    return directChannel == null? 0 : directChannel.getPort();
 +  }
 +  
 +  /**
 +   * for mock testing this allows insertion of a DirectChannel mock
 +   */
 +  protected void setDirectChannel(DirectChannel dc) {
 +    this.directChannel = dc;
 +    this.tcpDisabled = false;
 +  }
 +  
 +  /* non-thread-owned serial channels and high priority channels are not
 +   * included
 +   */
 +  public Map getMessageState(DistributedMember member, boolean includeMulticast) {
 +    Map result = new HashMap();
 +    DirectChannel dc = directChannel;
 +    if (dc != null) {
 +      dc.getChannelStates(member, result);
 +    }
 +    services.getMessenger().getMessageState((InternalDistributedMember)member, result, includeMulticast);
 +    return result;
 +  }
 +
 +  public void waitForMessageState(DistributedMember otherMember, Map state)
 +    throws InterruptedException
 +  {
 +    if (Thread.interrupted()) throw new InterruptedException();
 +    DirectChannel dc = directChannel;
 +    if (dc != null) {
 +      dc.waitForChannelState(otherMember, state);
 +    }
 +    services.getMessenger().waitForMessageState((InternalDistributedMember)otherMember, state);
 +  }
 +  
 +  /* 
 +   * (non-Javadoc)
 +   * MembershipManager method: wait for the given member to be gone.  Throws TimeoutException if
 +   * the wait goes too long
 +   * @see com.gemstone.gemfire.distributed.internal.membership.MembershipManager#waitForDeparture(com.gemstone.gemfire.distributed.DistributedMember)
 +   */
 +  public boolean waitForDeparture(DistributedMember mbr) throws TimeoutException, InterruptedException {
 +    if (Thread.interrupted()) throw new InterruptedException();
 +    boolean result = false;
 +    DirectChannel dc = directChannel;
 +    InternalDistributedMember idm = (InternalDistributedMember)mbr;
 +    int memberTimeout = this.services.getConfig().getDistributionConfig().getMemberTimeout();
 +    long pauseTime = (memberTimeout < 1000) ? 100 : memberTimeout / 10;
 +    boolean wait;
 +    int numWaits = 0;
 +    do {
 +      wait = false;
 +      if (dc != null) {
 +        if (dc.hasReceiversFor(idm)) {
 +          wait = true;
 +        }
 +        if (wait && logger.isDebugEnabled()) {
 +          logger.info("waiting for receivers for {} to shut down", mbr);
 +        }
 +      }
 +      if (!wait) {
 +        latestViewLock.readLock().lock();
 +        try {
 +          wait = this.latestView.contains(idm);
 +        } finally {
 +          latestViewLock.readLock().unlock();
 +        }
 +        if (wait && logger.isDebugEnabled()) {
 +          logger.debug("waiting for {} to leave the membership view", mbr);
 +        }
 +      }
 +      if (!wait) {
 +        // run a message through the member's serial execution queue to ensure that all of its
 +        // current messages have been processed
 +        OverflowQueueWithDMStats serialQueue = listener.getDM().getSerialQueue(idm);
 +        if (serialQueue != null) {
 +          final boolean done[] = new boolean[1];
 +          final FlushingMessage msg = new FlushingMessage(done);
 +          serialQueue.add(new SizeableRunnable(100) {
 +            public void run() {
 +              msg.invoke();
 +            }
 +            public String toString() {
 +              return "Processing fake message";
 +            }
 +          });
 +          synchronized(done) {
 +            while (done[0] == false) {
 +              done.wait(10);
 +            }
 +            result = true;
 +          }
 +        }
 +      }
 +      if (wait) {
 +        numWaits++;
 +        if (numWaits > 40) {
 +          // waited over 4 * memberTimeout ms.  Give up at this point
 +          throw new TimeoutException("waited too long for " + idm + " to be removed");
 +        }
 +        Thread.sleep(pauseTime);
 +      }
 +    } while (wait && (dc != null && dc.isOpen())
 +        && services.getCancelCriterion().cancelInProgress()==null );
 +    if (logger.isDebugEnabled()) {
 +      logger.debug("operations for {} should all be in the cache at this point", mbr);
 +    }
 +    return result;
 +  }
 +  
 +
 +  // TODO remove this overly complex method and replace its use with
 +  // waitForViewChange using the remote member's view ID
 +  public boolean waitForMembershipCheck(InternalDistributedMember remoteId) {
 +    boolean foundRemoteId = false;
 +    CountDownLatch currentLatch = null;
 +    // ARB: preconditions
 +    // remoteId != null
 +    latestViewLock.writeLock().lock();
 +    try {
 +      if (latestView == null) {
 +        // Not sure how this would happen, but see bug 38460.
 +        // No view?? Not found!
 +      }
 +      else if (latestView.contains(remoteId)) {
 +        // ARB: check if remoteId is already in membership view.
 +        // If not, then create a latch if needed and wait for the latch to open.
 +        foundRemoteId = true;
 +      }
 +      else if ((currentLatch = (CountDownLatch)this.memberLatch.get(remoteId)) == null) {
 +        currentLatch = new CountDownLatch(1);
 +        this.memberLatch.put(remoteId, currentLatch);
 +      }
 +    } finally {
 +      latestViewLock.writeLock().unlock();
 +    }
 +
 +    if (!foundRemoteId) {
 +      try {
 +        if (currentLatch.await(membershipCheckTimeout, TimeUnit.MILLISECONDS)) {
 +          foundRemoteId = true;
 +          // @todo 
 +          // ARB: remove latch from memberLatch map if this is the last thread waiting on latch.
 +        }
 +      }
 +      catch (InterruptedException ex) {
 +        // ARB: latch attempt was interrupted.
 +        Thread.currentThread().interrupt();
 +        logger.warn(LocalizedMessage.create(
 +            LocalizedStrings.GroupMembershipService_THE_MEMBERSHIP_CHECK_WAS_TERMINATED_WITH_AN_EXCEPTION));
 +      }
 +    }
 +
 +    // ARB: postconditions
 +    // (foundRemoteId == true) ==> (currentLatch is non-null ==> currentLatch is open)
 +    return foundRemoteId;
 +  }
 +  
 +  /* returns the cause of shutdown, if known */
 +  public Throwable getShutdownCause() {
 +    return services.getShutdownCause();
 +  }
 +
 +//  @Override
 +//  public void membershipFailure(String reason, Exception e) {
 +//

<TRUNCATED>