You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by qi...@apache.org on 2015/08/13 19:51:18 UTC

[2/3] incubator-geode git commit: GEODE-77: Implement Authenticator interface in class GMSAuthenticator with unit tests.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0a70d514/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index cb4f9c9..76d9d71 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -1,1178 +1,1178 @@
-package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
-
-import static com.gemstone.gemfire.distributed.internal.DistributionManager.LOCATOR_DM_TYPE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.REMOVE_MEMBER_MESSAGE;
-import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MESSAGE;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.logging.log4j.Logger;
-
-import com.gemstone.gemfire.SystemConnectException;
-import com.gemstone.gemfire.distributed.internal.DistributionConfig;
-import com.gemstone.gemfire.distributed.internal.DistributionManager;
-import com.gemstone.gemfire.distributed.internal.DistributionMessage;
-import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
-import com.gemstone.gemfire.distributed.internal.membership.NetView;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
-import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
-import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
-import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
-import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
-import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
-import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
-import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
-import com.gemstone.gemfire.internal.Version;
-import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
-import com.gemstone.gemfire.security.AuthenticationFailedException;
-
-/**
- * GMSJoinLeave handles membership communication with other processes in the
- * distributed system.  It replaces the JGroups channel membership services
- * that Geode formerly used for this purpose.
- * 
- */
-public class GMSJoinLeave implements JoinLeave, MessageHandler {
-
-  /** number of times to try joining before giving up */
-  private static final int JOIN_ATTEMPTS = Integer.getInteger("gemfire.join-attempts", 4);
-  
-  /** amount of time to sleep before trying to join after a failed attempt */
-  private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
-  
-  /** amount of time to wait for a view to be acked by all members before performing suspect processing on non-responders */
-  private static final int VIEW_INSTALLATION_TIMEOUT = Integer.getInteger("gemfire.view-ack-timeout", 12500);
-
-  /** stall time to wait for concurrent join/leave/remove requests to be received */
-  private static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("gemfire.member-request-collection-interval", 2000);
-
-  /** time to wait for a leave request to be transmitted by jgroups */
-  private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.leave-message-sleep-time", 2000);
-  
-  /** membership logger */
-  private static final Logger logger = Services.getLogger();
-
-
-  /** the view ID where I entered into membership */
-  private int birthViewId;
-
-  /** my address */
-  private InternalDistributedMember localAddress;
-
-  private Services services;
-  
-  /** have I connected to the distributed system? */
-  private boolean isJoined;
-
-  /** a lock governing GMS state */
-  private ReadWriteLock stateLock = new ReentrantReadWriteLock();
-  
-  /** guarded by stateLock */
-  private boolean isCoordinator;
-  
-  /** a synch object that guards view installation */
-  private final Object viewInstallationLock = new Object();
-  
-  /** the currently installed view */
-  private volatile NetView currentView;
-  
-  /** a new view being installed */
-  private NetView preparedView;
-  
-  /** the last view that conflicted with view preparation */
-  private NetView lastConflictingView;
-  
-  private List<InetSocketAddress> locators;
-  
-  /** a list of join/leave/crashes */
-  private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
-
-  /** collects the response to a join request */
-  private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
-  
-  /** collects responses to new views */
-  private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
-  
-  /** collects responses to view preparation messages */
-  private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
-
-  /** whether quorum checks can cause a forced-disconnect */
-  private boolean quorumRequired = false;
-  
-  /** timeout in receiving view acknowledgement */
-  private int viewAckTimeout;
-
-  /** background thread that creates new membership views */
-  private ViewCreator viewCreator;
-  
-  /** am I shutting down? */
-  private volatile boolean isStopping;
-  
-  
-  /**
-   * attempt to join the distributed system
-   * loop
-   *   send a join request to a locator & get a response
-   *
-   * If the response indicates there's no coordinator it
-   * will contain a set of members that have recently contacted
-   * it.  The "oldest" member is selected as the coordinator
-   * based on ID sort order.
-   * 
-   * @return true if successful, false if not
-   */
-  public boolean join() {
-
-    if (this.localAddress.getVmKind() == LOCATOR_DM_TYPE
-        && Boolean.getBoolean("gemfire.first-member")) {
-      becomeCoordinator();
-      return true;
-    }
-
-    for (int tries=0; tries<JOIN_ATTEMPTS; tries++) {
-      InternalDistributedMember coord = findCoordinator();
-      logger.debug("found possible coordinator {}", coord);
-      if (coord != null) {
-        if (coord.equals(this.localAddress)) {
-          if (tries > (JOIN_ATTEMPTS/2)) {
-            becomeCoordinator();
-            return true;
-          }
-        } else {
-          if (attemptToJoin(coord)) {
-            return true;
-          } 
-        }
-      }
-      try {
-        Thread.sleep(JOIN_RETRY_SLEEP);
-      } catch (InterruptedException e) {
-        return false;
-      }
-    } // for
-    return this.isJoined;
-  }
-
-  /**
-   * send a join request and wait for a reply.  Process the reply.
-   * This may throw a SystemConnectException or an AuthenticationFailedException
-   * @param coord
-   * @return true if the attempt succeeded, false if it timed out
-   */
-  private boolean attemptToJoin(InternalDistributedMember coord) {
-    // send a join request to the coordinator and wait for a response
-    logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
-    JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, 
-        services.getAuthenticator().getCredentials());
-
-    services.getMessenger().send(req);
-    
-    JoinResponseMessage response = null;
-    synchronized(joinResponse) {
-      if (joinResponse[0] == null) {
-        try {
-          joinResponse.wait(services.getConfig().getJoinTimeout()/JOIN_ATTEMPTS);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return false;
-        }
-      }
-      response = joinResponse[0];
-    }
-    if (response != null) {
-      joinResponse[0] = null;
-      String failReason = response.getRejectionMessage();
-      if (failReason != null) {
-        if (failReason.contains("Rejecting the attempt of a member using an older version")
-            || failReason.contains("15806")) {
-          throw new SystemConnectException(failReason);
-        }
-        throw new AuthenticationFailedException(failReason);
-      }
-      if (response.getCurrentView() != null) {
-        this.birthViewId = response.getMemberID().getVmViewId();
-        this.localAddress.setVmViewId(this.birthViewId);
-        GMSMember me = (GMSMember)this.localAddress.getNetMember();
-        GMSMember o = (GMSMember)response.getMemberID().getNetMember();
-        me.setSplitBrainEnabled(o.isSplitBrainEnabled());
-        me.setPreferredForCoordinator(o.preferredForCoordinator());
-        installView(response.getCurrentView());
-        return true;
-      }
-    }
-    return false;
-  }
-  
-  
-  /**
-   * process a join request from another member.  If this is the coordinator
-   * this method will enqueue the request for processing in another thread.
-   * If this is not the coordinator but the coordinator is known, the message
-   * is forwarded to the coordinator.
-   * @param incomingRequest
-   */
-  private void processJoinRequest(JoinRequestMessage incomingRequest) {
-    if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
-      logger.warn("detected an attempt to start a peer using an older version of the product {}",
-          incomingRequest.getMemberID());
-      JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
-      m.setRecipient(incomingRequest.getMemberID());
-      services.getMessenger().send(m);
-      return;
-    }
-    Object creds = incomingRequest.getCredentials();
-    if (creds != null) {
-      String rejection = null;
-      try {
-        rejection = services.getAuthenticator().authenticate(incomingRequest.getMemberID(), creds);
-      } catch (Exception e) {
-        rejection = e.getMessage();
-      }
-      if (rejection != null  &&  rejection.length() > 0) {
-        JoinResponseMessage m = new JoinResponseMessage(rejection);
-        m.setRecipient(incomingRequest.getMemberID());
-        services.getMessenger().send(m);
-      }
-    }
-    recordViewRequest(incomingRequest);
-  }
-  
-  
-  /**
-   * Process a Leave request from another member.  This may cause this member
-   * to become the new membership coordinator.  If this is the coordinator
-   * a new view will be triggered.
-   * 
-   * @param incomingRequest
-   */
-  private void processLeaveRequest(LeaveRequestMessage incomingRequest) {
-    NetView v = currentView;
-    if (logger.isDebugEnabled()) {
-      logger.debug("JoinLeave.processLeaveRequest invoked.  isCoordinator="+isCoordinator+ "; isStopping="+isStopping
-          +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
-    }
-    if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-      logger.debug("JoinLeave is checking to see if I should become coordinator");
-      NetView check = new NetView(v, v.getViewId()+1);
-      check.remove(incomingRequest.getMemberID());
-      if (check.getCoordinator().equals(localAddress)) {
-        becomeCoordinator(incomingRequest.getMemberID());
-      }
-    }
-    else {
-      if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-        recordViewRequest(incomingRequest);
-      }
-    }
-  }
-  
-  
-  /**
-   * Process a Remove request from another member.  This may cause this member
-   * to become the new membership coordinator.  If this is the coordinator
-   * a new view will be triggered.
-   * 
-   * @param incomingRequest
-   */
-  private void processRemoveRequest(RemoveMemberMessage incomingRequest) {
-    NetView v = currentView;
-    if (logger.isDebugEnabled()) {
-      logger.debug("JoinLeave.processRemoveRequest invoked.  isCoordinator="+isCoordinator+ "; isStopping="+isStopping
-          +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
-    }
-    InternalDistributedMember mbr = incomingRequest.getMemberID();
-
-    if (v != null  &&  !v.contains(incomingRequest.getSender())) {
-      logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender());
-      return;
-    }
-    
-    logger.info("Membership received a request to remove " + mbr
-        + "; reason="+incomingRequest.getReason());
-
-    if (mbr.equals(this.localAddress)) {
-      // oops - I've been kicked out
-      services.getManager().forceDisconnect(incomingRequest.getReason());
-      return;
-    }
-    
-    if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-      logger.debug("JoinLeave is checking to see if I should become coordinator");
-      NetView check = new NetView(v, v.getViewId()+1);
-      check.remove(mbr);
-      if (check.getCoordinator().equals(localAddress)) {
-        becomeCoordinator(mbr);
-      }
-    }
-    else {
-      if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-        recordViewRequest(incomingRequest);
-      }
-    }
-  }
-  
-  
-  private void recordViewRequest(DistributionMessage request) {
-    logger.debug("JoinLeave is recording the request to be processed in the next membership view");
-    synchronized(viewRequests) {
-      viewRequests.add(request);
-      viewRequests.notify();
-    }
-  }
-  
-  //for testing purposes, returns a copy of the view requests for verification
-  List<DistributionMessage> getViewRequests() {
-    synchronized(viewRequests) {
-      return new LinkedList<DistributionMessage>(viewRequests);
-    }
-  }
-  
-  /**
-   * Yippeee - I get to be the coordinator
-   */
-  private void becomeCoordinator() {
-    becomeCoordinator(null);
-  }
-  
-  /**
-   * @param oldCoordinator may be null
-   */
-  private void becomeCoordinator(InternalDistributedMember oldCoordinator) {
-    stateLock.writeLock().lock();
-    try {
-      if (isCoordinator) {
-        return;
-      }
-      logger.info("This member is becoming the membership coordinator with address {}", localAddress);
-      isCoordinator = true;
-      if (currentView == null) {
-        // create the initial membership view
-        NetView newView = new NetView(this.localAddress);
-        this.localAddress.setVmViewId(0);
-        installView(newView);
-        isJoined = true;
-        startCoordinatorServices();
-      } else {
-        // create and send out a new view
-        NetView newView;
-        synchronized(viewInstallationLock) {
-          int viewNumber = currentView.getViewId() + 5;
-          List<InternalDistributedMember> mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
-          mbrs.add(localAddress);
-          List<InternalDistributedMember> leaving = new ArrayList<InternalDistributedMember>();
-          if (oldCoordinator != null) {
-            leaving.add(oldCoordinator);
-          }
-          newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
-              Collections.<InternalDistributedMember>emptyList());
-        }
-        sendView(newView, Collections.<InternalDistributedMember>emptyList());
-        startCoordinatorServices();
-      }
-    } finally {
-      stateLock.writeLock().unlock();
-    }
-  }
-  
-  
-  private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
-    for (InternalDistributedMember mbr: newMbrs) {
-      JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
-      services.getMessenger().send(response);
-    }
-  }
-  
-  private void sendRemoveMessages(List<InternalDistributedMember> newMbrs,
-      List<String> reasons, NetView newView) {
-    Iterator<String> reason = reasons.iterator();
-    for (InternalDistributedMember mbr: newMbrs) {
-      RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next());
-      services.getMessenger().send(response);
-    }
-  }
-  
-  
-  boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers) {
-    return sendView(view, newMembers, true, this.prepareProcessor);
-  }
-  
-  void sendView(NetView view, Collection<InternalDistributedMember> newMembers) {
-    sendView(view, newMembers, false, this.viewProcessor);
-  }
-  
-  
-  boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
-    int id = view.getViewId();
-    InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(), preparing);
-    Set<InternalDistributedMember> recips = new HashSet<InternalDistributedMember>(view.getMembers());
-    recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
-    recips.remove(this.localAddress); // no need to send it to ourselves
-    installView(view);
-    recips.addAll(view.getCrashedMembers());
-    if (recips.isEmpty()) {
-      return true;
-    }
-    msg.setRecipients(recips);
-    rp.initialize(id, recips);
-
-    logger.info((preparing? "preparing" : "sending") + " new view " + view);
-    services.getMessenger().send(msg);
-
-    // only wait for responses during preparation
-    if (preparing) {
-      Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
-
-      logger.info("View Creator is finished waiting for responses to view preparation");
-      
-      InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
-      NetView conflictingView = rp.getConflictingView();
-      if (conflictingView != null) {
-        logger.warn("View Creator received a conflicting membership view from " + conflictingViewSender
-            + " during preparation: " + conflictingView);
-        return false;
-      }
-      
-      if (!failedToRespond.isEmpty()  &&  (services.getCancelCriterion().cancelInProgress() == null)) {
-        logger.warn("these members failed to respond to the view change: " + failedToRespond);
-        return false;
-      }
-    }
-    
-    return true;
-  }
-  
-  
-
-  private void processViewMessage(InstallViewMessage m) {
-    NetView view = m.getView();
-    
-    if (currentView != null  &&  view.getViewId() < currentView.getViewId()) {
-      // ignore old views
-      ackView(m);
-      return;
-    }
-    
-    
-    if (m.isPreparing()) {
-      if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) {
-        services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
-      }
-      else {
-        this.preparedView = view;
-        ackView(m);
-      }
-    }
-    else { // !preparing
-      if (currentView != null  &&  !view.contains(this.localAddress)) {
-        if (quorumRequired) {
-          services.getManager().forceDisconnect("This node is no longer in the membership view");
-        }
-      }
-      else {
-        ackView(m);
-        installView(view);
-      }
-    }
-  }
-  
-
-  private void ackView(InstallViewMessage m) {
-    if (m.getView().contains(m.getView().getCreator())) {
-      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
-    }
-  }
-  
-  
-  private void processViewAckMessage(ViewAckMessage m) {
-    if (m.isPrepareAck()) {
-      this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
-    } else {
-      this.viewProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
-    }
-  }
-  
-  /**
-   * This contacts the locators to find out who the current coordinator is.
-   * All locators are contacted.  If they don't agree then we choose the oldest
-   * coordinator and return it.
-   * @return
-   */
-  private InternalDistributedMember findCoordinator() {
-    assert this.localAddress != null;
-    
-    FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress);
-    Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
-    long giveUpTime = System.currentTimeMillis() + (services.getConfig().getLocatorWaitTime() * 1000L);
-    boolean anyResponses = false;
-    
-    do {
-      for (InetSocketAddress addr: locators) { 
-        try {
-          Object o = TcpClient.requestToServer(
-              addr.getAddress(), addr.getPort(), request, services.getConfig().getJoinTimeout(), 
-              true);
-          FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
-          if (response != null && response.getCoordinator() != null) {
-            anyResponses = false;
-            coordinators.add(response.getCoordinator());
-            if (response.isFromView()) {
-              GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
-              services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
-              if (response.isUsePreferredCoordinators()
-                  && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
-                mbr.setPreferredForCoordinator(false);
-              }
-            }
-          }
-        } catch (IOException | ClassNotFoundException problem) {
-        }
-      }
-      if (coordinators.isEmpty()) {
-        return null;
-      }
-      if (!anyResponses) {
-        try { Thread.sleep(2000); } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          return null;
-        }
-      }
-    } while (!anyResponses && System.currentTimeMillis() < giveUpTime);
-    
-    Iterator<InternalDistributedMember> it = coordinators.iterator();
-    if (coordinators.size() == 1) {
-      return it.next();
-    }
-    InternalDistributedMember oldest = it.next();
-    while (it.hasNext()) {
-      InternalDistributedMember candidate = it.next();
-      if (oldest.compareTo(candidate) > 0) {
-        oldest = candidate;
-      }
-    }
-    return oldest;
-  }
-  
-  
-  /**
-   * receives a JoinResponse holding a membership view or rejection message
-   * @param rsp
-   */
-  private void processJoinResponse(JoinResponseMessage rsp) {
-    synchronized(joinResponse) {
-      joinResponse[0] = rsp;
-      joinResponse.notify();
-    }
-  }
-
-  @Override
-  public NetView getView() {
-    return currentView;
-  }
-
-  @Override
-  public InternalDistributedMember getMemberID() {
-    return this.localAddress;
-  }
-  
-  public void installView(NetView newView) {
-    synchronized(viewInstallationLock) {
-      if (currentView != null && currentView.getViewId() >= newView.getViewId()) {
-        // old view - ignore it
-        return;
-      }
-      
-      if (checkForPartition(newView)) {
-        if (quorumRequired) {
-          List<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
-          services.getManager().forceDisconnect(
-              LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
-        }
-        return;
-      }
-      
-      currentView = newView;
-      preparedView = null;
-      lastConflictingView = null;
-      services.installView(newView);
-      
-      if (!newView.getCreator().equals(this.localAddress)) {
-        if (newView.shouldBeCoordinator(this.localAddress)) {
-          becomeCoordinator();
-        } else if (this.isCoordinator) {
-          // stop being coordinator
-          stateLock.writeLock().lock();
-          try {
-            stopCoordinatorServices();
-            this.isCoordinator = false;
-          } finally {
-            stateLock.writeLock().unlock();
-          }
-        }
-      }
-      if (!this.isCoordinator) {
-        // get rid of outdated requests.  It's possible some requests are
-        // newer than the view just processed - the senders will have to
-        // resend these
-        synchronized(viewRequests) {
-          for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
-            DistributionMessage m = it.next();
-            if (m instanceof JoinRequestMessage) {
-              it.remove();
-            } else if (m instanceof LeaveRequestMessage) {
-              if (!currentView.contains(((LeaveRequestMessage)m).getMemberID())) {
-                it.remove();
-              }
-            } else if (m instanceof RemoveMemberMessage) {
-              if (!currentView.contains(((RemoveMemberMessage)m).getMemberID())) {
-                it.remove();
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-  
-
-  /**
-   * check to see if the new view shows a drop of 51% or more
-   */
-  private boolean checkForPartition(NetView newView) {
-    if (currentView == null) {
-      return false;
-    }
-    int oldWeight = currentView.memberWeight();
-    int failedWeight = newView.getCrashedMemberWeight(currentView);
-    if (failedWeight > 0) {
-      if (logger.isInfoEnabled()) {
-        newView.logCrashedMemberWeights(currentView, logger);
-      }
-      int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0);
-      if (failedWeight > failurePoint) {
-        services.getManager().quorumLost(newView.getActualCrashedMembers(currentView), currentView);
-        return true;
-      }
-    }
-    return false;
-  }
-  
-  
-  /** invoke this under the viewInstallationLock */
-  private void startCoordinatorServices() {
-    if (viewCreator == null || viewCreator.isShutdown()) {
-      viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
-      viewCreator.setDaemon(true);
-      viewCreator.start();
-    }
-  }
-  
-  private void stopCoordinatorServices() {
-    if (viewCreator != null && !viewCreator.isShutdown()) {
-      viewCreator.shutdown();
-    }
-  }
-  
-  public static void loadEmergencyClasses() {
-  }
-  
-  @Override
-  public void emergencyClose() {
-    isStopping = true;
-    isJoined = false;
-    stopCoordinatorServices();
-    isCoordinator = false;
-    currentView = null;
-  }
-
-  public void beSick() {
-  }
-
-  public void playDead() {
-  }
-
-  public void beHealthy() {
-  }
-
-
-
-  @Override
-  public void start() {
-  }
-
-
-
-  @Override
-  public void started() {
-    this.localAddress = services.getMessenger().getMemberID();
-  }
-
-
-
-  @Override
-  public void stop() {
-    logger.debug("JoinLeave stopping");
-    leave();
-  }
-
-
-
-  @Override
-  public void stopped() {
-    // TODO Auto-generated method stub
-    
-  }
-
-
-
-  @Override
-  public void leave() {
-    synchronized(viewInstallationLock) {
-      NetView view = currentView;
-      isStopping = true;
-      if (view != null) {
-        if (view.size() > 1) {
-          if (this.isCoordinator) {
-            logger.debug("JoinLeave stopping coordination services");
-            stopCoordinatorServices();
-            NetView newView = new NetView(view, view.getViewId()+1);
-            newView.remove(localAddress);
-            InstallViewMessage m = new InstallViewMessage(newView, services.getAuthenticator().getCredentials());
-            m.setRecipients(newView.getMembers());
-            services.getMessenger().send(m);
-            try { Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME); }
-            catch (InterruptedException e) { Thread.currentThread().interrupt(); }
-          }
-          else {
-            logger.debug("JoinLeave sending a leave request to {}", view.getCoordinator());
-            LeaveRequestMessage m = new LeaveRequestMessage(view.getCoordinator(), this.localAddress);
-            services.getMessenger().send(m);
-          }
-        } // view.size
-      }// view != null
-    }
-      
-  }
-
-
-
-  @Override
-  public void remove(InternalDistributedMember m, String reason) {
-    NetView v = this.currentView;
-    if (v != null) {
-      RemoveMemberMessage msg = new RemoveMemberMessage(v.getCoordinator(), m,
-          reason);
-      services.getMessenger().send(msg);
-    }
-  }
-  
-  @Override
-  public void disableDisconnectOnQuorumLossForTesting() {
-    this.quorumRequired = false;
-  }
-  
-  @Override
-  public void init(Services s) {
-    this.services = s;
-    services.getMessenger().addHandler(JoinRequestMessage.class, this);
-    services.getMessenger().addHandler(JoinResponseMessage.class, this);
-    services.getMessenger().addHandler(InstallViewMessage.class, this);
-    services.getMessenger().addHandler(ViewAckMessage.class, this);
-    services.getMessenger().addHandler(LeaveRequestMessage.class, this);
-    services.getMessenger().addHandler(RemoveMemberMessage.class, this);
-    services.getMessenger().addHandler(JoinRequestMessage.class, this);
-    services.getMessenger().addHandler(JoinResponseMessage.class, this);
-
-    DistributionConfig dc = services.getConfig().getDistributionConfig();
-    int ackCollectionTimeout = dc.getMemberTimeout() * 2 * 12437 / 10000;
-    if (ackCollectionTimeout < 1500) {
-      ackCollectionTimeout = 1500;
-    } else if (ackCollectionTimeout > 12437) {
-      ackCollectionTimeout = 12437;
-    }
-    ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue();
-    this.viewAckTimeout = ackCollectionTimeout;
-    
-    this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection();
-    
-    DistributionConfig dconfig = services.getConfig().getDistributionConfig();
-    String bindAddr = dconfig.getBindAddress();
-    locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
-  }
-
-  @Override
-  public void processMessage(DistributionMessage m) {
-    if (isStopping) {
-      return;
-    }
-    logger.debug("JoinLeave processing {}", m);
-    switch (m.getDSFID()) {
-    case JOIN_REQUEST:
-      processJoinRequest((JoinRequestMessage)m);
-      break;
-    case JOIN_RESPONSE:
-      processJoinResponse((JoinResponseMessage)m);
-      break;
-    case INSTALL_VIEW_MESSAGE:
-      processViewMessage((InstallViewMessage)m);
-      break;
-    case VIEW_ACK_MESSAGE:
-      processViewAckMessage((ViewAckMessage)m);
-      break;
-    case LEAVE_REQUEST_MESSAGE:
-      processLeaveRequest((LeaveRequestMessage)m);
-      break;
-    case REMOVE_MEMBER_MESSAGE:
-      processRemoveRequest((RemoveMemberMessage)m);
-      break;
-    default:
-      throw new IllegalArgumentException("unknown message type: " + m);
-    }
-  }
-  
-
-  
-  
-  class ViewReplyProcessor {
-    volatile int viewId = -1;
-    volatile Set<InternalDistributedMember> recipients;
-    volatile NetView conflictingView;
-    volatile InternalDistributedMember conflictingViewSender;
-    volatile boolean waiting;
-    final boolean isPrepareViewProcessor;
-    
-    ViewReplyProcessor(boolean forPreparation) {
-      this.isPrepareViewProcessor = forPreparation;
-    }
-    
-    void initialize(int viewId, Set<InternalDistributedMember> recips) {
-      this.waiting = true;
-      this.viewId = viewId;
-      this.recipients = recips;
-      this.conflictingView = null;
-    }
-    
-    void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) {
-      if (!this.waiting) {
-        return;
-      }
-      
-      if (viewId == this.viewId) {
-        if (conflictingView != null) {
-          this.conflictingViewSender = sender;
-          this.conflictingView = conflictingView;
-        }
-
-        Set<InternalDistributedMember> waitingFor = this.recipients;
-        synchronized(waitingFor) {
-          waitingFor.remove(sender);
-          if (waitingFor.isEmpty()) {
-            logger.debug("All view responses received - notifying waiting thread");
-            waitingFor.notify();
-          }
-        }
-
-      }
-    }
-    
-    Set<InternalDistributedMember> waitForResponses() {
-      Set<InternalDistributedMember> result = this.recipients;
-      long endOfWait = System.currentTimeMillis() + viewAckTimeout;
-      try {
-        while (System.currentTimeMillis() < endOfWait
-            &&  (services.getCancelCriterion().cancelInProgress() == null)) {
-          try {
-            synchronized(result) {
-              result.wait(1000);
-            }
-          } catch (InterruptedException e) {
-            logger.debug("Interrupted while waiting for view resonses");
-            Thread.currentThread().interrupt();
-            return result;
-          }
-        }
-      } finally {
-        this.waiting = false;
-      }
-      return result;
-    }
-    
-    NetView getConflictingView() {
-      return this.conflictingView;
-    }
-    
-    InternalDistributedMember getConflictingViewSender() {
-      return this.conflictingViewSender;
-    }
-    
-    Set<InternalDistributedMember> getUnresponsiveMembers() {
-      return this.recipients;
-    }
-  }
-  
-
-  
-  
-
-  class ViewCreator extends Thread {
-    boolean shutdown = false;
-    
-    ViewCreator(String name, ThreadGroup tg) {
-      super(tg, name);
-    }
-    
-    void shutdown() {
-      shutdown = true;
-      synchronized(viewRequests) {
-        viewRequests.notify();
-        interrupt();
-      }
-    }
-    
-    boolean isShutdown() {
-      return shutdown;
-    }
-
-    @Override
-    public void run() {
-      List<DistributionMessage> requests = null;
-      logger.info("View Creator thread is starting");
-      long okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
-      try {
-        for (;;) {
-          synchronized(viewRequests) {
-            if (shutdown) {
-              return;
-            }
-            if (viewRequests.isEmpty()) {
-              try {
-                logger.debug("View Creator is waiting for requests");
-                viewRequests.wait();
-              } catch (InterruptedException e) {
-                return;
-              }
-            } else {
-              if (System.currentTimeMillis() < okayToCreateView) {
-                // sleep to let more requests arrive
-                try {
-                  sleep(100);
-                  continue;
-                } catch (InterruptedException e) {
-                  return;
-                }
-              } else {
-                // time to create a new membership view
-                if (requests == null) {
-                  requests = new ArrayList<DistributionMessage>(viewRequests);
-                } else {
-                  requests.addAll(viewRequests);
-                }
-                viewRequests.clear();
-                okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
-              }
-            }
-          } // synchronized
-          if (requests != null  && !requests.isEmpty()) {
-            logger.debug("View Creator is processing {} requests for the next membership view", requests.size());
-            /*boolean success = */createAndSendView(requests);
-            requests = null;
-          }
-        }
-      } finally {
-        shutdown = true;
-      }
-    }
-    
-    /**
-     * Create a new membership view and send it to members (including crashed members).
-     * Returns false if the view cannot be prepared successfully, true otherwise
-     */
-    boolean createAndSendView(List<DistributionMessage> requests) {
-      List<InternalDistributedMember> joinReqs = new ArrayList<InternalDistributedMember>();
-      List<InternalDistributedMember> leaveReqs = new ArrayList<InternalDistributedMember>();
-      List<InternalDistributedMember> removalReqs = new ArrayList<InternalDistributedMember>();
-      List<String> removalReasons = new ArrayList<String>();
-      
-      for (DistributionMessage msg: requests) {
-        logger.debug("processing request {}", msg);
-        if (msg instanceof JoinRequestMessage) {
-          InternalDistributedMember mbr = ((JoinRequestMessage)msg).getMemberID();
-          joinReqs.add(mbr);
-        }
-        else if (msg instanceof LeaveRequestMessage) {
-          leaveReqs.add(((LeaveRequestMessage) msg).getMemberID());
-        }
-        else if (msg instanceof RemoveMemberMessage) {
-          removalReqs.add(((RemoveMemberMessage) msg).getMemberID());
-          removalReasons.add(((RemoveMemberMessage) msg).getReason());
-        }
-        else {
-          // TODO: handle removals
-          logger.warn("Unknown membership request encountered: {}", msg);
-        }
-      }
-      
-      NetView newView;
-      synchronized(viewInstallationLock) {
-        int viewNumber = 0;
-        List<InternalDistributedMember> mbrs;
-        if (currentView == null) {
-          mbrs = new ArrayList<InternalDistributedMember>(joinReqs.size());
-        } else {
-          viewNumber = currentView.getViewId()+1;
-          mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
-        }
-        mbrs.addAll(joinReqs);
-        mbrs.removeAll(leaveReqs);
-        mbrs.removeAll(removalReqs);
-        newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs,
-            removalReqs);
-      }
-      
-      for (InternalDistributedMember mbr: joinReqs) {
-        mbr.setVmViewId(newView.getViewId());
-      }
-      // send removal messages before installing the view so we stop
-      // getting messages from members that have been kicked out
-      sendRemoveMessages(removalReqs, removalReasons, newView);
-      
-      // we want to always check for quorum loss but don't act on it
-      // unless network-partition-detection is enabled
-      if ( !(checkForPartition(newView) && quorumRequired) ) {
-        sendJoinResponses(joinReqs, newView);
-      }
-
-      if (quorumRequired) {
-        boolean prepared = false;
-        do {
-          if (this.shutdown || Thread.currentThread().isInterrupted()) {
-            return false;
-          }
-          prepared = prepareView(newView, joinReqs);
-          if (!prepared && quorumRequired) {
-            Set<InternalDistributedMember> unresponsive = prepareProcessor.getUnresponsiveMembers();
-            try {
-              removeHealthyMembers(unresponsive);
-            } catch (InterruptedException e) {
-              // abort the view if interrupted
-              shutdown = true;
-              return false;
-            }
-  
-            if (!unresponsive.isEmpty()) {
-              List<InternalDistributedMember> failures = new ArrayList<InternalDistributedMember>(currentView.getCrashedMembers().size() + unresponsive.size());
-              failures.addAll(unresponsive);
-
-              NetView conflictingView = prepareProcessor.getConflictingView();
-              if (conflictingView != null
-                  && !conflictingView.getCreator().equals(localAddress)
-                  && conflictingView.getViewId() > newView.getViewId()
-                  && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
-                lastConflictingView = conflictingView;
-                failures.addAll(conflictingView.getCrashedMembers());
-              }
-
-              failures.removeAll(removalReqs);
-              if (failures.size() > 0) {
-                // abort the current view and try again
-                removalReqs.addAll(failures);
-                newView = new NetView(localAddress, newView.getViewId()+1, newView.getMembers(), leaveReqs,
-                    removalReqs);
-              }
-            }
-          }
-        } while (!prepared);
-      } // quorumRequired
-      
-      lastConflictingView = null;
-      
-      sendView(newView, joinReqs);
-      return true;
-    }
-    
-    /**
-     * performs health checks on the collection of members, removing any that
-     * are found to be healthy
-     * @param mbrs
-     */
-    private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
-      List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size()); 
-      
-      for (InternalDistributedMember mbr: mbrs) {
-        final InternalDistributedMember fmbr = mbr;
-        checkers.add(new Callable<InternalDistributedMember>() {
-          @Override
-          public InternalDistributedMember call() throws Exception {
-            // return the member id if it fails health checks
-            logger.info("checking state of member " + fmbr);
-            if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
-              logger.info("member " + fmbr + " passed availability check");
-              return fmbr;
-            }
-            logger.info("member " + fmbr + " failed availability check");
-            return null;
-          }
-        });
-      }
-      
-      ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() {
-        AtomicInteger i = new AtomicInteger();
-        @Override
-        public Thread newThread(Runnable r) {
-          return new Thread(Services.getThreadGroup(), r,
-              "Member verification thread " + i.incrementAndGet());
-        }
-      });
-
-      try {
-        List<Future<InternalDistributedMember>> futures;
-        futures = svc.invokeAll(checkers);
-
-        for (Future<InternalDistributedMember> future: futures) {
-          try {
-            InternalDistributedMember mbr = future.get(viewAckTimeout, TimeUnit.MILLISECONDS);
-            if (mbr != null) {
-              logger.debug("disregarding lack of acknowledgement from {}", mbr);
-              mbrs.remove(mbr);
-            }
-          } catch (java.util.concurrent.TimeoutException e) {
-            // TODO should the member be removed if we can't verify it in time?
-          } catch (ExecutionException e) {
-            logger.info("unexpected exception caught during member verification", e);
-          }
-        }
-      } finally {
-        svc.shutdownNow();
-      }
-    }
-  }
-
-}
+package com.gemstone.gemfire.distributed.internal.membership.gms.membership;
+
+import static com.gemstone.gemfire.distributed.internal.DistributionManager.LOCATOR_DM_TYPE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.INSTALL_VIEW_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_REQUEST;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.JOIN_RESPONSE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.REMOVE_MEMBER_MESSAGE;
+import static com.gemstone.gemfire.internal.DataSerializableFixedID.VIEW_ACK_MESSAGE;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.logging.log4j.Logger;
+
+import com.gemstone.gemfire.SystemConnectException;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.distributed.internal.membership.NetView;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSMember;
+import com.gemstone.gemfire.distributed.internal.membership.gms.GMSUtil;
+import com.gemstone.gemfire.distributed.internal.membership.gms.Services;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.JoinLeave;
+import com.gemstone.gemfire.distributed.internal.membership.gms.interfaces.MessageHandler;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorRequest;
+import com.gemstone.gemfire.distributed.internal.membership.gms.locator.FindCoordinatorResponse;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.InstallViewMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.JoinResponseMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.LeaveRequestMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.RemoveMemberMessage;
+import com.gemstone.gemfire.distributed.internal.membership.gms.messages.ViewAckMessage;
+import com.gemstone.gemfire.distributed.internal.tcpserver.TcpClient;
+import com.gemstone.gemfire.internal.Version;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+import com.gemstone.gemfire.security.AuthenticationFailedException;
+
+/**
+ * GMSJoinLeave handles membership communication with other processes in the
+ * distributed system.  It replaces the JGroups channel membership services
+ * that Geode formerly used for this purpose.
+ * 
+ */
+public class GMSJoinLeave implements JoinLeave, MessageHandler {
+
+  /** number of times to try joining before giving up */
+  private static final int JOIN_ATTEMPTS = Integer.getInteger("gemfire.join-attempts", 4);
+  
+  /** amount of time to sleep before trying to join after a failed attempt */
+  private static final int JOIN_RETRY_SLEEP = Integer.getInteger("gemfire.join-retry-sleep", 1000);
+  
+  /** amount of time to wait for a view to be acked by all members before performing suspect processing on non-responders */
+  private static final int VIEW_INSTALLATION_TIMEOUT = Integer.getInteger("gemfire.view-ack-timeout", 12500);
+
+  /** stall time to wait for concurrent join/leave/remove requests to be received */
+  private static final long MEMBER_REQUEST_COLLECTION_INTERVAL = Long.getLong("gemfire.member-request-collection-interval", 2000);
+
+  /** time to wait for a leave request to be transmitted by jgroups */
+  private static final long LEAVE_MESSAGE_SLEEP_TIME = Long.getLong("gemfire.leave-message-sleep-time", 2000);
+  
+  /** membership logger */
+  private static final Logger logger = Services.getLogger();
+
+
+  /** the view ID where I entered into membership */
+  private int birthViewId;
+
+  /** my address */
+  private InternalDistributedMember localAddress;
+
+  private Services services;
+  
+  /** have I connected to the distributed system? */
+  private boolean isJoined;
+
+  /** a lock governing GMS state */
+  private ReadWriteLock stateLock = new ReentrantReadWriteLock();
+  
+  /** guarded by stateLock */
+  private boolean isCoordinator;
+  
+  /** a synch object that guards view installation */
+  private final Object viewInstallationLock = new Object();
+  
+  /** the currently installed view */
+  private volatile NetView currentView;
+  
+  /** a new view being installed */
+  private NetView preparedView;
+  
+  /** the last view that conflicted with view preparation */
+  private NetView lastConflictingView;
+  
+  private List<InetSocketAddress> locators;
+  
+  /** a list of join/leave/crashes */
+  private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
+
+  /** collects the response to a join request */
+  private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
+  
+  /** collects responses to new views */
+  private ViewReplyProcessor viewProcessor = new ViewReplyProcessor(false);
+  
+  /** collects responses to view preparation messages */
+  private ViewReplyProcessor prepareProcessor = new ViewReplyProcessor(true);
+
+  /** whether quorum checks can cause a forced-disconnect */
+  private boolean quorumRequired = false;
+  
+  /** timeout in receiving view acknowledgement */
+  private int viewAckTimeout;
+
+  /** background thread that creates new membership views */
+  private ViewCreator viewCreator;
+  
+  /** am I shutting down? */
+  private volatile boolean isStopping;
+  
+  
+  /**
+   * attempt to join the distributed system
+   * loop
+   *   send a join request to a locator & get a response
+   *
+   * If the response indicates there's no coordinator it
+   * will contain a set of members that have recently contacted
+   * it.  The "oldest" member is selected as the coordinator
+   * based on ID sort order.
+   * 
+   * @return true if successful, false if not
+   */
+  public boolean join() {
+
+    if (this.localAddress.getVmKind() == LOCATOR_DM_TYPE
+        && Boolean.getBoolean("gemfire.first-member")) {
+      becomeCoordinator();
+      return true;
+    }
+
+    for (int tries=0; tries<JOIN_ATTEMPTS; tries++) {
+      InternalDistributedMember coord = findCoordinator();
+      logger.debug("found possible coordinator {}", coord);
+      if (coord != null) {
+        if (coord.equals(this.localAddress)) {
+          if (tries > (JOIN_ATTEMPTS/2)) {
+            becomeCoordinator();
+            return true;
+          }
+        } else {
+          if (attemptToJoin(coord)) {
+            return true;
+          } 
+        }
+      }
+      try {
+        Thread.sleep(JOIN_RETRY_SLEEP);
+      } catch (InterruptedException e) {
+        return false;
+      }
+    } // for
+    return this.isJoined;
+  }
+
+  /**
+   * send a join request and wait for a reply.  Process the reply.
+   * This may throw a SystemConnectException or an AuthenticationFailedException
+   * @param coord
+   * @return true if the attempt succeeded, false if it timed out
+   */
+  private boolean attemptToJoin(InternalDistributedMember coord) {
+    // send a join request to the coordinator and wait for a response
+    logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
+    JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, 
+        services.getAuthenticator().getCredentials(coord));
+
+    services.getMessenger().send(req);
+    
+    JoinResponseMessage response = null;
+    synchronized(joinResponse) {
+      if (joinResponse[0] == null) {
+        try {
+          joinResponse.wait(services.getConfig().getJoinTimeout()/JOIN_ATTEMPTS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return false;
+        }
+      }
+      response = joinResponse[0];
+    }
+    if (response != null) {
+      joinResponse[0] = null;
+      String failReason = response.getRejectionMessage();
+      if (failReason != null) {
+        if (failReason.contains("Rejecting the attempt of a member using an older version")
+            || failReason.contains("15806")) {
+          throw new SystemConnectException(failReason);
+        }
+        throw new AuthenticationFailedException(failReason);
+      }
+      if (response.getCurrentView() != null) {
+        this.birthViewId = response.getMemberID().getVmViewId();
+        this.localAddress.setVmViewId(this.birthViewId);
+        GMSMember me = (GMSMember)this.localAddress.getNetMember();
+        GMSMember o = (GMSMember)response.getMemberID().getNetMember();
+        me.setSplitBrainEnabled(o.isSplitBrainEnabled());
+        me.setPreferredForCoordinator(o.preferredForCoordinator());
+        installView(response.getCurrentView());
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  
+  /**
+   * process a join request from another member.  If this is the coordinator
+   * this method will enqueue the request for processing in another thread.
+   * If this is not the coordinator but the coordinator is known, the message
+   * is forwarded to the coordinator.
+   * @param incomingRequest
+   */
+  private void processJoinRequest(JoinRequestMessage incomingRequest) {
+    if (incomingRequest.getMemberID().getVersionObject().compareTo(Version.CURRENT) < 0) {
+      logger.warn("detected an attempt to start a peer using an older version of the product {}",
+          incomingRequest.getMemberID());
+      JoinResponseMessage m = new JoinResponseMessage("Rejecting the attempt of a member using an older version");
+      m.setRecipient(incomingRequest.getMemberID());
+      services.getMessenger().send(m);
+      return;
+    }
+    Object creds = incomingRequest.getCredentials();
+    if (creds != null) {
+      String rejection = null;
+      try {
+        rejection = services.getAuthenticator().authenticate(incomingRequest.getMemberID(), creds);
+      } catch (Exception e) {
+        rejection = e.getMessage();
+      }
+      if (rejection != null  &&  rejection.length() > 0) {
+        JoinResponseMessage m = new JoinResponseMessage(rejection);
+        m.setRecipient(incomingRequest.getMemberID());
+        services.getMessenger().send(m);
+      }
+    }
+    recordViewRequest(incomingRequest);
+  }
+  
+  
+  /**
+   * Process a Leave request from another member.  This may cause this member
+   * to become the new membership coordinator.  If this is the coordinator
+   * a new view will be triggered.
+   * 
+   * @param incomingRequest
+   */
+  private void processLeaveRequest(LeaveRequestMessage incomingRequest) {
+    NetView v = currentView;
+    if (logger.isDebugEnabled()) {
+      logger.debug("JoinLeave.processLeaveRequest invoked.  isCoordinator="+isCoordinator+ "; isStopping="+isStopping
+          +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
+    }
+    if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+      logger.debug("JoinLeave is checking to see if I should become coordinator");
+      NetView check = new NetView(v, v.getViewId()+1);
+      check.remove(incomingRequest.getMemberID());
+      if (check.getCoordinator().equals(localAddress)) {
+        becomeCoordinator(incomingRequest.getMemberID());
+      }
+    }
+    else {
+      if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+        recordViewRequest(incomingRequest);
+      }
+    }
+  }
+  
+  
+  /**
+   * Process a Remove request from another member.  This may cause this member
+   * to become the new membership coordinator.  If this is the coordinator
+   * a new view will be triggered.
+   * 
+   * @param incomingRequest
+   */
+  private void processRemoveRequest(RemoveMemberMessage incomingRequest) {
+    NetView v = currentView;
+    if (logger.isDebugEnabled()) {
+      logger.debug("JoinLeave.processRemoveRequest invoked.  isCoordinator="+isCoordinator+ "; isStopping="+isStopping
+          +"; cancelInProgress="+services.getCancelCriterion().isCancelInProgress());
+    }
+    InternalDistributedMember mbr = incomingRequest.getMemberID();
+
+    if (v != null  &&  !v.contains(incomingRequest.getSender())) {
+      logger.info("Membership ignoring removal request for " + mbr + " from non-member " + incomingRequest.getSender());
+      return;
+    }
+    
+    logger.info("Membership received a request to remove " + mbr
+        + "; reason="+incomingRequest.getReason());
+
+    if (mbr.equals(this.localAddress)) {
+      // oops - I've been kicked out
+      services.getManager().forceDisconnect(incomingRequest.getReason());
+      return;
+    }
+    
+    if (!isCoordinator && !isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+      logger.debug("JoinLeave is checking to see if I should become coordinator");
+      NetView check = new NetView(v, v.getViewId()+1);
+      check.remove(mbr);
+      if (check.getCoordinator().equals(localAddress)) {
+        becomeCoordinator(mbr);
+      }
+    }
+    else {
+      if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+        recordViewRequest(incomingRequest);
+      }
+    }
+  }
+  
+  
+  private void recordViewRequest(DistributionMessage request) {
+    logger.debug("JoinLeave is recording the request to be processed in the next membership view");
+    synchronized(viewRequests) {
+      viewRequests.add(request);
+      viewRequests.notify();
+    }
+  }
+  
+  //for testing purposes, returns a copy of the view requests for verification
+  List<DistributionMessage> getViewRequests() {
+    synchronized(viewRequests) {
+      return new LinkedList<DistributionMessage>(viewRequests);
+    }
+  }
+  
+  /**
+   * Yippeee - I get to be the coordinator
+   */
+  private void becomeCoordinator() {
+    becomeCoordinator(null);
+  }
+  
+  /**
+   * @param oldCoordinator may be null
+   */
+  private void becomeCoordinator(InternalDistributedMember oldCoordinator) {
+    stateLock.writeLock().lock();
+    try {
+      if (isCoordinator) {
+        return;
+      }
+      logger.info("This member is becoming the membership coordinator with address {}", localAddress);
+      isCoordinator = true;
+      if (currentView == null) {
+        // create the initial membership view
+        NetView newView = new NetView(this.localAddress);
+        this.localAddress.setVmViewId(0);
+        installView(newView);
+        isJoined = true;
+        startCoordinatorServices();
+      } else {
+        // create and send out a new view
+        NetView newView;
+        synchronized(viewInstallationLock) {
+          int viewNumber = currentView.getViewId() + 5;
+          List<InternalDistributedMember> mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
+          mbrs.add(localAddress);
+          List<InternalDistributedMember> leaving = new ArrayList<InternalDistributedMember>();
+          if (oldCoordinator != null) {
+            leaving.add(oldCoordinator);
+          }
+          newView = new NetView(this.localAddress, viewNumber, mbrs, leaving,
+              Collections.<InternalDistributedMember>emptyList());
+        }
+        sendView(newView, Collections.<InternalDistributedMember>emptyList());
+        startCoordinatorServices();
+      }
+    } finally {
+      stateLock.writeLock().unlock();
+    }
+  }
+  
+  
+  private void sendJoinResponses(List<InternalDistributedMember> newMbrs, NetView newView) {
+    for (InternalDistributedMember mbr: newMbrs) {
+      JoinResponseMessage response = new JoinResponseMessage(mbr, newView);
+      services.getMessenger().send(response);
+    }
+  }
+  
+  private void sendRemoveMessages(List<InternalDistributedMember> newMbrs,
+      List<String> reasons, NetView newView) {
+    Iterator<String> reason = reasons.iterator();
+    for (InternalDistributedMember mbr: newMbrs) {
+      RemoveMemberMessage response = new RemoveMemberMessage(mbr, mbr, reason.next());
+      services.getMessenger().send(response);
+    }
+  }
+  
+  
+  boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers) {
+    return sendView(view, newMembers, true, this.prepareProcessor);
+  }
+  
+  void sendView(NetView view, Collection<InternalDistributedMember> newMembers) {
+    sendView(view, newMembers, false, this.viewProcessor);
+  }
+  
+  
+  boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp) {
+    int id = view.getViewId();
+    InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing);
+    Set<InternalDistributedMember> recips = new HashSet<InternalDistributedMember>(view.getMembers());
+    recips.removeAll(newMembers); // new members get the view in a JoinResponseMessage
+    recips.remove(this.localAddress); // no need to send it to ourselves
+    installView(view);
+    recips.addAll(view.getCrashedMembers());
+    if (recips.isEmpty()) {
+      return true;
+    }
+    msg.setRecipients(recips);
+    rp.initialize(id, recips);
+
+    logger.info((preparing? "preparing" : "sending") + " new view " + view);
+    services.getMessenger().send(msg);
+
+    // only wait for responses during preparation
+    if (preparing) {
+      Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
+
+      logger.info("View Creator is finished waiting for responses to view preparation");
+      
+      InternalDistributedMember conflictingViewSender = rp.getConflictingViewSender();
+      NetView conflictingView = rp.getConflictingView();
+      if (conflictingView != null) {
+        logger.warn("View Creator received a conflicting membership view from " + conflictingViewSender
+            + " during preparation: " + conflictingView);
+        return false;
+      }
+      
+      if (!failedToRespond.isEmpty()  &&  (services.getCancelCriterion().cancelInProgress() == null)) {
+        logger.warn("these members failed to respond to the view change: " + failedToRespond);
+        return false;
+      }
+    }
+    
+    return true;
+  }
+  
+  
+
+  private void processViewMessage(InstallViewMessage m) {
+    NetView view = m.getView();
+    
+    if (currentView != null  &&  view.getViewId() < currentView.getViewId()) {
+      // ignore old views
+      ackView(m);
+      return;
+    }
+    
+    
+    if (m.isPreparing()) {
+      if (this.preparedView != null && this.preparedView.getViewId() >= view.getViewId()) {
+        services.getMessenger().send(new ViewAckMessage(m.getSender(), this.preparedView));
+      }
+      else {
+        this.preparedView = view;
+        ackView(m);
+      }
+    }
+    else { // !preparing
+      if (currentView != null  &&  !view.contains(this.localAddress)) {
+        if (quorumRequired) {
+          services.getManager().forceDisconnect("This node is no longer in the membership view");
+        }
+      }
+      else {
+        ackView(m);
+        installView(view);
+      }
+    }
+  }
+  
+
+  private void ackView(InstallViewMessage m) {
+    if (m.getView().contains(m.getView().getCreator())) {
+      services.getMessenger().send(new ViewAckMessage(m.getSender(), m.getView().getViewId(), m.isPreparing()));
+    }
+  }
+  
+  
+  private void processViewAckMessage(ViewAckMessage m) {
+    if (m.isPrepareAck()) {
+      this.prepareProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+    } else {
+      this.viewProcessor.processViewResponse(m.getViewId(), m.getSender(), m.getAlternateView());
+    }
+  }
+  
+  /**
+   * This contacts the locators to find out who the current coordinator is.
+   * All locators are contacted.  If they don't agree then we choose the oldest
+   * coordinator and return it.
+   * @return
+   */
+  private InternalDistributedMember findCoordinator() {
+    assert this.localAddress != null;
+    
+    FindCoordinatorRequest request = new FindCoordinatorRequest(this.localAddress);
+    Set<InternalDistributedMember> coordinators = new HashSet<InternalDistributedMember>();
+    long giveUpTime = System.currentTimeMillis() + (services.getConfig().getLocatorWaitTime() * 1000L);
+    boolean anyResponses = false;
+    
+    do {
+      for (InetSocketAddress addr: locators) { 
+        try {
+          Object o = TcpClient.requestToServer(
+              addr.getAddress(), addr.getPort(), request, services.getConfig().getJoinTimeout(), 
+              true);
+          FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
+          if (response != null && response.getCoordinator() != null) {
+            anyResponses = false;
+            coordinators.add(response.getCoordinator());
+            if (response.isFromView()) {
+              GMSMember mbr = (GMSMember)this.localAddress.getNetMember();
+              services.getConfig().setNetworkPartitionDetectionEnabled(response.isNetworkPartitionDetectionEnabled());
+              if (response.isUsePreferredCoordinators()
+                  && localAddress.getVmKind() != DistributionManager.LOCATOR_DM_TYPE) {
+                mbr.setPreferredForCoordinator(false);
+              }
+            }
+          }
+        } catch (IOException | ClassNotFoundException problem) {
+        }
+      }
+      if (coordinators.isEmpty()) {
+        return null;
+      }
+      if (!anyResponses) {
+        try { Thread.sleep(2000); } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return null;
+        }
+      }
+    } while (!anyResponses && System.currentTimeMillis() < giveUpTime);
+    
+    Iterator<InternalDistributedMember> it = coordinators.iterator();
+    if (coordinators.size() == 1) {
+      return it.next();
+    }
+    InternalDistributedMember oldest = it.next();
+    while (it.hasNext()) {
+      InternalDistributedMember candidate = it.next();
+      if (oldest.compareTo(candidate) > 0) {
+        oldest = candidate;
+      }
+    }
+    return oldest;
+  }
+  
+  
+  /**
+   * receives a JoinResponse holding a membership view or rejection message
+   * @param rsp
+   */
+  private void processJoinResponse(JoinResponseMessage rsp) {
+    synchronized(joinResponse) {
+      joinResponse[0] = rsp;
+      joinResponse.notify();
+    }
+  }
+
+  @Override
+  public NetView getView() {
+    return currentView;
+  }
+
+  @Override
+  public InternalDistributedMember getMemberID() {
+    return this.localAddress;
+  }
+  
+  public void installView(NetView newView) {
+    synchronized(viewInstallationLock) {
+      if (currentView != null && currentView.getViewId() >= newView.getViewId()) {
+        // old view - ignore it
+        return;
+      }
+      
+      if (checkForPartition(newView)) {
+        if (quorumRequired) {
+          List<InternalDistributedMember> crashes = newView.getActualCrashedMembers(currentView);
+          services.getManager().forceDisconnect(
+              LocalizedStrings.Network_partition_detected.toLocalizedString(crashes.size(), crashes));
+        }
+        return;
+      }
+      
+      currentView = newView;
+      preparedView = null;
+      lastConflictingView = null;
+      services.installView(newView);
+      
+      if (!newView.getCreator().equals(this.localAddress)) {
+        if (newView.shouldBeCoordinator(this.localAddress)) {
+          becomeCoordinator();
+        } else if (this.isCoordinator) {
+          // stop being coordinator
+          stateLock.writeLock().lock();
+          try {
+            stopCoordinatorServices();
+            this.isCoordinator = false;
+          } finally {
+            stateLock.writeLock().unlock();
+          }
+        }
+      }
+      if (!this.isCoordinator) {
+        // get rid of outdated requests.  It's possible some requests are
+        // newer than the view just processed - the senders will have to
+        // resend these
+        synchronized(viewRequests) {
+          for (Iterator<DistributionMessage> it = viewRequests.iterator(); it.hasNext(); ) {
+            DistributionMessage m = it.next();
+            if (m instanceof JoinRequestMessage) {
+              it.remove();
+            } else if (m instanceof LeaveRequestMessage) {
+              if (!currentView.contains(((LeaveRequestMessage)m).getMemberID())) {
+                it.remove();
+              }
+            } else if (m instanceof RemoveMemberMessage) {
+              if (!currentView.contains(((RemoveMemberMessage)m).getMemberID())) {
+                it.remove();
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+  
+
+  /**
+   * check to see if the new view shows a drop of 51% or more
+   */
+  private boolean checkForPartition(NetView newView) {
+    if (currentView == null) {
+      return false;
+    }
+    int oldWeight = currentView.memberWeight();
+    int failedWeight = newView.getCrashedMemberWeight(currentView);
+    if (failedWeight > 0) {
+      if (logger.isInfoEnabled()) {
+        newView.logCrashedMemberWeights(currentView, logger);
+      }
+      int failurePoint = (int)(Math.round(51 * oldWeight) / 100.0);
+      if (failedWeight > failurePoint) {
+        services.getManager().quorumLost(newView.getActualCrashedMembers(currentView), currentView);
+        return true;
+      }
+    }
+    return false;
+  }
+  
+  
+  /** invoke this under the viewInstallationLock */
+  private void startCoordinatorServices() {
+    if (viewCreator == null || viewCreator.isShutdown()) {
+      viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
+      viewCreator.setDaemon(true);
+      viewCreator.start();
+    }
+  }
+  
+  private void stopCoordinatorServices() {
+    if (viewCreator != null && !viewCreator.isShutdown()) {
+      viewCreator.shutdown();
+    }
+  }
+  
+  public static void loadEmergencyClasses() {
+  }
+  
+  @Override
+  public void emergencyClose() {
+    isStopping = true;
+    isJoined = false;
+    stopCoordinatorServices();
+    isCoordinator = false;
+    currentView = null;
+  }
+
+  public void beSick() {
+  }
+
+  public void playDead() {
+  }
+
+  public void beHealthy() {
+  }
+
+
+
+  @Override
+  public void start() {
+  }
+
+
+
+  @Override
+  public void started() {
+    this.localAddress = services.getMessenger().getMemberID();
+  }
+
+
+
+  @Override
+  public void stop() {
+    logger.debug("JoinLeave stopping");
+    leave();
+  }
+
+
+
+  @Override
+  public void stopped() {
+    // TODO Auto-generated method stub
+    
+  }
+
+
+
+  @Override
+  public void leave() {
+    synchronized(viewInstallationLock) {
+      NetView view = currentView;
+      isStopping = true;
+      if (view != null) {
+        if (view.size() > 1) {
+          if (this.isCoordinator) {
+            logger.debug("JoinLeave stopping coordination services");
+            stopCoordinatorServices();
+            NetView newView = new NetView(view, view.getViewId()+1);
+            newView.remove(localAddress);
+            InstallViewMessage m = new InstallViewMessage(newView, services.getAuthenticator().getCredentials(this.localAddress));
+            m.setRecipients(newView.getMembers());
+            services.getMessenger().send(m);
+            try { Thread.sleep(LEAVE_MESSAGE_SLEEP_TIME); }
+            catch (InterruptedException e) { Thread.currentThread().interrupt(); }
+          }
+          else {
+            logger.debug("JoinLeave sending a leave request to {}", view.getCoordinator());
+            LeaveRequestMessage m = new LeaveRequestMessage(view.getCoordinator(), this.localAddress);
+            services.getMessenger().send(m);
+          }
+        } // view.size
+      }// view != null
+    }
+      
+  }
+
+
+
+  @Override
+  public void remove(InternalDistributedMember m, String reason) {
+    NetView v = this.currentView;
+    if (v != null) {
+      RemoveMemberMessage msg = new RemoveMemberMessage(v.getCoordinator(), m,
+          reason);
+      services.getMessenger().send(msg);
+    }
+  }
+  
+  @Override
+  public void disableDisconnectOnQuorumLossForTesting() {
+    this.quorumRequired = false;
+  }
+  
+  @Override
+  public void init(Services s) {
+    this.services = s;
+    services.getMessenger().addHandler(JoinRequestMessage.class, this);
+    services.getMessenger().addHandler(JoinResponseMessage.class, this);
+    services.getMessenger().addHandler(InstallViewMessage.class, this);
+    services.getMessenger().addHandler(ViewAckMessage.class, this);
+    services.getMessenger().addHandler(LeaveRequestMessage.class, this);
+    services.getMessenger().addHandler(RemoveMemberMessage.class, this);
+    services.getMessenger().addHandler(JoinRequestMessage.class, this);
+    services.getMessenger().addHandler(JoinResponseMessage.class, this);
+
+    DistributionConfig dc = services.getConfig().getDistributionConfig();
+    int ackCollectionTimeout = dc.getMemberTimeout() * 2 * 12437 / 10000;
+    if (ackCollectionTimeout < 1500) {
+      ackCollectionTimeout = 1500;
+    } else if (ackCollectionTimeout > 12437) {
+      ackCollectionTimeout = 12437;
+    }
+    ackCollectionTimeout = Integer.getInteger("gemfire.VIEW_ACK_TIMEOUT", ackCollectionTimeout).intValue();
+    this.viewAckTimeout = ackCollectionTimeout;
+    
+    this.quorumRequired = services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection();
+    
+    DistributionConfig dconfig = services.getConfig().getDistributionConfig();
+    String bindAddr = dconfig.getBindAddress();
+    locators = GMSUtil.parseLocators(dconfig.getLocators(), bindAddr);
+  }
+
+  @Override
+  public void processMessage(DistributionMessage m) {
+    if (isStopping) {
+      return;
+    }
+    logger.debug("JoinLeave processing {}", m);
+    switch (m.getDSFID()) {
+    case JOIN_REQUEST:
+      processJoinRequest((JoinRequestMessage)m);
+      break;
+    case JOIN_RESPONSE:
+      processJoinResponse((JoinResponseMessage)m);
+      break;
+    case INSTALL_VIEW_MESSAGE:
+      processViewMessage((InstallViewMessage)m);
+      break;
+    case VIEW_ACK_MESSAGE:
+      processViewAckMessage((ViewAckMessage)m);
+      break;
+    case LEAVE_REQUEST_MESSAGE:
+      processLeaveRequest((LeaveRequestMessage)m);
+      break;
+    case REMOVE_MEMBER_MESSAGE:
+      processRemoveRequest((RemoveMemberMessage)m);
+      break;
+    default:
+      throw new IllegalArgumentException("unknown message type: " + m);
+    }
+  }
+  
+
+  
+  
+  class ViewReplyProcessor {
+    volatile int viewId = -1;
+    volatile Set<InternalDistributedMember> recipients;
+    volatile NetView conflictingView;
+    volatile InternalDistributedMember conflictingViewSender;
+    volatile boolean waiting;
+    final boolean isPrepareViewProcessor;
+    
+    ViewReplyProcessor(boolean forPreparation) {
+      this.isPrepareViewProcessor = forPreparation;
+    }
+    
+    void initialize(int viewId, Set<InternalDistributedMember> recips) {
+      this.waiting = true;
+      this.viewId = viewId;
+      this.recipients = recips;
+      this.conflictingView = null;
+    }
+    
+    void processViewResponse(int viewId, InternalDistributedMember sender, NetView conflictingView) {
+      if (!this.waiting) {
+        return;
+      }
+      
+      if (viewId == this.viewId) {
+        if (conflictingView != null) {
+          this.conflictingViewSender = sender;
+          this.conflictingView = conflictingView;
+        }
+
+        Set<InternalDistributedMember> waitingFor = this.recipients;
+        synchronized(waitingFor) {
+          waitingFor.remove(sender);
+          if (waitingFor.isEmpty()) {
+            logger.debug("All view responses received - notifying waiting thread");
+            waitingFor.notify();
+          }
+        }
+
+      }
+    }
+    
+    Set<InternalDistributedMember> waitForResponses() {
+      Set<InternalDistributedMember> result = this.recipients;
+      long endOfWait = System.currentTimeMillis() + viewAckTimeout;
+      try {
+        while (System.currentTimeMillis() < endOfWait
+            &&  (services.getCancelCriterion().cancelInProgress() == null)) {
+          try {
+            synchronized(result) {
+              result.wait(1000);
+            }
+          } catch (InterruptedException e) {
+            logger.debug("Interrupted while waiting for view resonses");
+            Thread.currentThread().interrupt();
+            return result;
+          }
+        }
+      } finally {
+        this.waiting = false;
+      }
+      return result;
+    }
+    
+    NetView getConflictingView() {
+      return this.conflictingView;
+    }
+    
+    InternalDistributedMember getConflictingViewSender() {
+      return this.conflictingViewSender;
+    }
+    
+    Set<InternalDistributedMember> getUnresponsiveMembers() {
+      return this.recipients;
+    }
+  }
+  
+
+  
+  
+
+  class ViewCreator extends Thread {
+    boolean shutdown = false;
+    
+    ViewCreator(String name, ThreadGroup tg) {
+      super(tg, name);
+    }
+    
+    void shutdown() {
+      shutdown = true;
+      synchronized(viewRequests) {
+        viewRequests.notify();
+        interrupt();
+      }
+    }
+    
+    boolean isShutdown() {
+      return shutdown;
+    }
+
+    @Override
+    public void run() {
+      List<DistributionMessage> requests = null;
+      logger.info("View Creator thread is starting");
+      long okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
+      try {
+        for (;;) {
+          synchronized(viewRequests) {
+            if (shutdown) {
+              return;
+            }
+            if (viewRequests.isEmpty()) {
+              try {
+                logger.debug("View Creator is waiting for requests");
+                viewRequests.wait();
+              } catch (InterruptedException e) {
+                return;
+              }
+            } else {
+              if (System.currentTimeMillis() < okayToCreateView) {
+                // sleep to let more requests arrive
+                try {
+                  sleep(100);
+                  continue;
+                } catch (InterruptedException e) {
+                  return;
+                }
+              } else {
+                // time to create a new membership view
+                if (requests == null) {
+                  requests = new ArrayList<DistributionMessage>(viewRequests);
+                } else {
+                  requests.addAll(viewRequests);
+                }
+                viewRequests.clear();
+                okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
+              }
+            }
+          } // synchronized
+          if (requests != null  && !requests.isEmpty()) {
+            logger.debug("View Creator is processing {} requests for the next membership view", requests.size());
+            /*boolean success = */createAndSendView(requests);
+            requests = null;
+          }
+        }
+      } finally {
+        shutdown = true;
+      }
+    }
+    
+    /**
+     * Create a new membership view and send it to members (including crashed members).
+     * Returns false if the view cannot be prepared successfully, true otherwise
+     */
+    boolean createAndSendView(List<DistributionMessage> requests) {
+      List<InternalDistributedMember> joinReqs = new ArrayList<InternalDistributedMember>();
+      List<InternalDistributedMember> leaveReqs = new ArrayList<InternalDistributedMember>();
+      List<InternalDistributedMember> removalReqs = new ArrayList<InternalDistributedMember>();
+      List<String> removalReasons = new ArrayList<String>();
+      
+      for (DistributionMessage msg: requests) {
+        logger.debug("processing request {}", msg);
+        if (msg instanceof JoinRequestMessage) {
+          InternalDistributedMember mbr = ((JoinRequestMessage)msg).getMemberID();
+          joinReqs.add(mbr);
+        }
+        else if (msg instanceof LeaveRequestMessage) {
+          leaveReqs.add(((LeaveRequestMessage) msg).getMemberID());
+        }
+        else if (msg instanceof RemoveMemberMessage) {
+          removalReqs.add(((RemoveMemberMessage) msg).getMemberID());
+          removalReasons.add(((RemoveMemberMessage) msg).getReason());
+        }
+        else {
+          // TODO: handle removals
+          logger.warn("Unknown membership request encountered: {}", msg);
+        }
+      }
+      
+      NetView newView;
+      synchronized(viewInstallationLock) {
+        int viewNumber = 0;
+        List<InternalDistributedMember> mbrs;
+        if (currentView == null) {
+          mbrs = new ArrayList<InternalDistributedMember>(joinReqs.size());
+        } else {
+          viewNumber = currentView.getViewId()+1;
+          mbrs = new ArrayList<InternalDistributedMember>(currentView.getMembers());
+        }
+        mbrs.addAll(joinReqs);
+        mbrs.removeAll(leaveReqs);
+        mbrs.removeAll(removalReqs);
+        newView = new NetView(localAddress, viewNumber, mbrs, leaveReqs,
+            removalReqs);
+      }
+      
+      for (InternalDistributedMember mbr: joinReqs) {
+        mbr.setVmViewId(newView.getViewId());
+      }
+      // send removal messages before installing the view so we stop
+      // getting messages from members that have been kicked out
+      sendRemoveMessages(removalReqs, removalReasons, newView);
+      
+      // we want to always check for quorum loss but don't act on it
+      // unless network-partition-detection is enabled
+      if ( !(checkForPartition(newView) && quorumRequired) ) {
+        sendJoinResponses(joinReqs, newView);
+      }
+
+      if (quorumRequired) {
+        boolean prepared = false;
+        do {
+          if (this.shutdown || Thread.currentThread().isInterrupted()) {
+            return false;
+          }
+          prepared = prepareView(newView, joinReqs);
+          if (!prepared && quorumRequired) {
+            Set<InternalDistributedMember> unresponsive = prepareProcessor.getUnresponsiveMembers();
+            try {
+              removeHealthyMembers(unresponsive);
+            } catch (InterruptedException e) {
+              // abort the view if interrupted
+              shutdown = true;
+              return false;
+            }
+  
+            if (!unresponsive.isEmpty()) {
+              List<InternalDistributedMember> failures = new ArrayList<InternalDistributedMember>(currentView.getCrashedMembers().size() + unresponsive.size());
+              failures.addAll(unresponsive);
+
+              NetView conflictingView = prepareProcessor.getConflictingView();
+              if (conflictingView != null
+                  && !conflictingView.getCreator().equals(localAddress)
+                  && conflictingView.getViewId() > newView.getViewId()
+                  && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
+                lastConflictingView = conflictingView;
+                failures.addAll(conflictingView.getCrashedMembers());
+              }
+
+              failures.removeAll(removalReqs);
+              if (failures.size() > 0) {
+                // abort the current view and try again
+                removalReqs.addAll(failures);
+                newView = new NetView(localAddress, newView.getViewId()+1, newView.getMembers(), leaveReqs,
+                    removalReqs);
+              }
+            }
+          }
+        } while (!prepared);
+      } // quorumRequired
+      
+      lastConflictingView = null;
+      
+      sendView(newView, joinReqs);
+      return true;
+    }
+    
+    /**
+     * performs health checks on the collection of members, removing any that
+     * are found to be healthy
+     * @param mbrs
+     */
+    private void removeHealthyMembers(Collection<InternalDistributedMember> mbrs) throws InterruptedException {
+      List<Callable<InternalDistributedMember>> checkers = new ArrayList<Callable<InternalDistributedMember>>(mbrs.size()); 
+      
+      for (InternalDistributedMember mbr: mbrs) {
+        final InternalDistributedMember fmbr = mbr;
+        checkers.add(new Callable<InternalDistributedMember>() {
+          @Override
+          public InternalDistributedMember call() throws Exception {
+            // return the member id if it fails health checks
+            logger.info("checking state of member " + fmbr);
+            if (services.getHealthMonitor().checkIfAvailable(fmbr, "Member failed to acknowledge a membership view", false)) {
+              logger.info("member " + fmbr + " passed availability check");
+              return fmbr;
+            }
+            logger.info("member " + fmbr + " failed availability check");
+            return null;
+          }
+        });
+      }
+      
+      ExecutorService svc = Executors.newFixedThreadPool(mbrs.size(), new ThreadFactory() {
+        AtomicInteger i = new AtomicInteger();
+        @Override
+        public Thread newThread(Runnable r) {
+          return new Thread(Services.getThreadGroup(), r,
+              "Member verification thread " + i.incrementAndGet());
+        }
+      });
+
+      try {
+        List<Future<InternalDistributedMember>> futures;
+        futures = svc.invokeAll(checkers);
+
+        for (Future<InternalDistributedMember> future: futures) {
+          try {
+            InternalDistributedMember mbr = future.get(viewAckTimeout, TimeUnit.MILLISECONDS);
+            if (mbr != null) {
+              logger.debug("disregarding lack of acknowledgement from {}", mbr);
+              mbrs.remove(mbr);
+            }
+          } catch (java.util.concurrent.TimeoutException e) {
+            // TODO should the member be removed if we can't verify it in time?
+          } catch (ExecutionException e) {
+            logger.info("unexpected exception caught during member verification", e);
+          }
+        }
+      } finally {
+        svc.shutdownNow();
+      }
+    }
+  }
+
+}