You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/10/28 15:51:39 UTC

incubator-geode git commit: GEODE-77: improvements in handling loss of coordinator

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-77 2847235c5 -> 14445c082


GEODE-77: improvements in handling loss of coordinator

1) Locator join-timeout has been bumped from 17 sec to 24.  This allows it to join in spite of the loss of the current coordinator and several backup coordinators.

2) Join responses are now sent after "prepare for view" messages so that other members have some knowledge of the new members before they are allowed into the system.  If the coordinator dies before completing preparation the new coordinator will find this prepared view and now adds its new members to its own membership view.

3) We no longer clear the collection of already-tried potential-coordinators when a new view is processed during startup.  Since join-requests have already been sent to these addresses there is no point in trying them again.

4) The coordinator now periodically sends out the current view to members and to crashed members of the last view using unreliable messaging.  This helps get rid of rogue processes & is similar to what we did with the view-sync JGroups protocol in GemFire.  I new method for unreliable message transmission to Messenger for this purpose.  This keeps JGroups UNICAST3 protocol from recreating a retransmission table entry for the old, crashed, members who would reject the old sequence numbers generated by the new table entry and never dispatch the messages.

5) Removal requests are now processed in the ViewCreator after all other requests have been processed.  This prevents a member from being kicked out when we've already received a shutdown message from that member.

6) Several new unit tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/14445c08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/14445c08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/14445c08

Branch: refs/heads/feature/GEODE-77
Commit: 14445c082a7642d5a771c4557c64858835fab283
Parents: 2847235
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Wed Oct 28 07:50:35 2015 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Wed Oct 28 07:50:35 2015 -0700

----------------------------------------------------------------------
 .../internal/membership/gms/GMSMember.java      |   8 +-
 .../internal/membership/gms/ServiceConfig.java  |   2 +-
 .../membership/gms/fd/GMSHealthMonitor.java     |  31 ++-
 .../membership/gms/interfaces/Messenger.java    |   7 +
 .../membership/gms/membership/GMSJoinLeave.java | 187 ++++++++++++++-----
 .../gms/messenger/JGroupsMessenger.java         |  24 ++-
 .../gemfire/internal/util/PluckStacks.java      |   5 +-
 .../gms/membership/GMSJoinLeaveJUnitTest.java   |  81 ++++++++
 8 files changed, 283 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
index d5b2e6b..51a166b 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/GMSMember.java
@@ -406,15 +406,15 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
     int flags = 0;
     if (splitBrainEnabled) flags |= SB_ENABLED;
     if (preferredForCoordinator) flags |= PREFERRED_FOR_COORD;
-    out.writeInt(flags);
+    out.writeShort(flags);
 
     DataSerializer.writeInetAddress(inetAddr, out);
     out.writeInt(udpPort);
     out.writeInt(vmViewId);
     out.writeInt(directPort);
     out.writeByte(memberWeight);
+    out.writeByte(vmKind);
     out.writeInt(processId);
-    out.writeInt(vmKind);
     DataSerializer.writeString(name,  out);
     DataSerializer.writeStringArray(groups, out);
     out.writeLong(uuidMSBs);
@@ -425,7 +425,7 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
   public void fromData(DataInput in) throws IOException, ClassNotFoundException {
     this.versionOrdinal = Version.readOrdinal(in);
     
-    int flags = in.readInt();
+    int flags = in.readShort();
     this.splitBrainEnabled = (flags & SB_ENABLED) != 0;
     this.preferredForCoordinator = (flags & PREFERRED_FOR_COORD) != 0;
     
@@ -434,8 +434,8 @@ public class GMSMember implements NetMember, DataSerializableFixedID {
     this.vmViewId = in.readInt();
     this.directPort = in.readInt();
     this.memberWeight = in.readByte();
+    this.vmKind = in.readByte();
     this.processId = in.readInt();
-    this.vmKind = in.readInt();
     this.name = DataSerializer.readString(in);
     this.groups = DataSerializer.readStringArray(in);
     this.uuidMSBs = in.readLong();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
index dc25ad2..df39308 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/ServiceConfig.java
@@ -112,7 +112,7 @@ public class ServiceConfig {
     this.dconfig = theConfig;
     this.transport = transport;
     
-    int defaultJoinTimeout = 17000;
+    int defaultJoinTimeout = 24000;
     if (theConfig.getLocators().length() > 0 && !Locator.hasLocators()) {
       defaultJoinTimeout = 60000;
     }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 7e11655..d89ba39 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -569,21 +569,42 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         short version = in.readShort();
         long uuidLSBs = in.readLong();
         long uuidMSBs = in.readLong();
-        logger.debug("GMSHealthMonitor received health check UUID {}:{}.", uuidMSBs, uuidLSBs);
+        boolean debug = logger.isDebugEnabled();
+        if (debug) {
+          logger.debug("GMSHealthMonitor received health check UUID {},{}",
+              Long.toHexString(uuidMSBs),
+              Long.toHexString(uuidLSBs));
+        }
         UUID myUUID = ((GMSMember) GMSHealthMonitor.this.localAddress.getNetMember()).getUUID();
-        logger.debug("GMSHealthMonitor my UUID is {}:{}", myUUID.getMostSignificantBits(), myUUID.getLeastSignificantBits());
-        if (uuidLSBs == myUUID.getLeastSignificantBits()
+        if (debug) {
+          if (playingDead) {
+            logger.debug("simulating sick member in health check");
+          } else if (uuidLSBs == myUUID.getLeastSignificantBits()
+            && uuidMSBs == myUUID.getMostSignificantBits()) {
+            logger.debug("UUID matches my own - sending OK reply");
+          } else {
+            logger.debug("GMSHealthMonitor my UUID is                 {},{}",
+              Long.toHexString(myUUID.getMostSignificantBits()),
+              Long.toHexString(myUUID.getLeastSignificantBits()));
+          }
+        }
+        if (!playingDead
+            && uuidLSBs == myUUID.getLeastSignificantBits()
             && uuidMSBs == myUUID.getMostSignificantBits()) {
           out.write(OK);
           out.flush();
           socket.shutdownOutput();
-          logger.debug("GMSHealthMonitor server socket replied OK.");
+          if (debug) {
+            logger.debug("GMSHealthMonitor server socket replied OK.");
+          }
         }
         else {
           out.write(ERROR);
           out.flush();
           socket.shutdownOutput();
-          logger.debug("GMSHealthMonitor server socket replied ERROR.");
+          if (debug) {
+            logger.debug("GMSHealthMonitor server socket replied ERROR.");
+          }
         }
       } catch (IOException e) {
         logger.trace("Unexpected exception", e);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
index e6b7f07..8c81ada 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/interfaces/Messenger.java
@@ -19,6 +19,13 @@ public interface Messenger extends Service {
   Set<InternalDistributedMember> send(DistributionMessage m);
 
   /**
+   * sends an asynchronous message.  Returns destinations that did not
+   * receive the message due to no longer being in the view.  Does
+   * not guarantee delivery of the message (no retransmissions)
+   */
+  Set<InternalDistributedMember> sendUnreliably(DistributionMessage m);
+
+  /**
    * returns the endpoint ID for this member
    */
   InternalDistributedMember getMemberID();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
index af050be..5633424 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeave.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TimerTask;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -88,6 +89,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   /** if the locators don't know who the coordinator is we send find-coord requests to this many nodes */
   private static final int MAX_DISCOVERY_NODES = Integer.getInteger("gemfire.max-discovery-nodes", 30);
 
+  /** interval for broadcasting the current view to members in case they didn't get it the first time */
+  private static final long VIEW_BROADCAST_INTERVAL = Long.getLong("gemfire.view-broadcast-interval", 60000);
+
   /** membership logger */
   private static final Logger logger = Services.getLogger();
 
@@ -127,6 +131,9 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
   /** a list of join/leave/crashes */
   private final List<DistributionMessage> viewRequests = new LinkedList<DistributionMessage>();
 
+  /** the established request collection jitter.  This can be overridden for testing with delayViewCreationForTest */
+  long requestCollectionInterval = MEMBER_REQUEST_COLLECTION_INTERVAL;
+
   /** collects the response to a join request */
   private JoinResponseMessage[] joinResponse = new JoinResponseMessage[1];
 
@@ -229,12 +236,12 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
             if (attemptToJoin()) {
               return true;
             }
-            if (System.currentTimeMillis() > giveupTime) {
-              break;
-            }
             if (!state.possibleCoordinator.equals(localAddress)) {
               state.alreadyTried.add(state.possibleCoordinator);
             }
+            if (System.currentTimeMillis() > giveupTime) {
+              break;
+            }
           }
         } else {
           long now = System.currentTimeMillis();
@@ -246,7 +253,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
             // reset the tries count and timer since we haven't actually tried to join yet
             tries = 0;
             giveupTime = now + timeout;
-          } else if (System.currentTimeMillis() > giveupTime) {
+          } else if (now > giveupTime) {
             break;
           }
         }
@@ -278,6 +285,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           this.localAddress.notifyAll();
         }
       }
+      searchState.cleanup();
     }
   }
 
@@ -293,13 +301,17 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
     // send a join request to the coordinator and wait for a response
     InternalDistributedMember coord = state.possibleCoordinator;
-    logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
-    JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord));
-    // add server socket port in the join request
-    if (services.getHealthMonitor().getSocketInfo().get(localAddress) != null) {
-      req.setSocketPort(services.getHealthMonitor().getSocketInfo().get(localAddress).getPort());
+    if (state.alreadyTried.contains(coord)) {
+      logger.info("Probably coordinator is still {} - waiting for a join-response");
+    } else {
+      logger.info("Attempting to join the distributed system through coordinator " + coord + " using address " + this.localAddress);
+      JoinRequestMessage req = new JoinRequestMessage(coord, this.localAddress, services.getAuthenticator().getCredentials(coord));
+      // add server socket port in the join request
+      if (services.getHealthMonitor().getSocketInfo().get(localAddress) != null) {
+        req.setSocketPort(services.getHealthMonitor().getSocketInfo().get(localAddress).getPort());
+      }
+      services.getMessenger().send(req);
     }
-    services.getMessenger().send(req);
 
     JoinResponseMessage response = null;
     synchronized (joinResponse) {
@@ -510,9 +522,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       }
     } else {
       if (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-        recordViewRequest(incomingRequest);
-        this.viewProcessor.processRemoveRequest(mbr);
-        this.prepareProcessor.processRemoveRequest(mbr);
+        // suspect processing tends to get carried away sometimes during
+        // shutdown (especially shutdownAll), so we check for a scheduled shutdown
+        // message
+        if (!getPendingRequestIDs(LEAVE_REQUEST_MESSAGE).contains(mbr)) {
+          recordViewRequest(incomingRequest);
+          this.viewProcessor.processRemoveRequest(mbr);
+          this.prepareProcessor.processRemoveRequest(mbr);
+        }
       }
     }
   }
@@ -550,6 +567,16 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }
   
+  /**
+   * Test hook for delaying the creation of new views.
+   * This should be invoked before this member becomes coordinator
+   * and creates its ViewCreator thread.
+   * @param millis
+   */
+  public void delayViewCreationForTest(int millis) {
+    requestCollectionInterval = millis;
+  }
+  
 
   /**
    * Transitions this member into the coordinator role.  This must
@@ -579,6 +606,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         viewCreator = new ViewCreator("GemFire Membership View Creator", Services.getThreadGroup());
         viewCreator.setDaemon(true);
         viewCreator.start();
+        startViewBroadcaster();
       }
     } else {
       // create and send out a new view
@@ -611,6 +639,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         viewCreator.setInitialView(newView, leaving, removals);
         viewCreator.setDaemon(true);
         viewCreator.start();
+        startViewBroadcaster();
       }
     }
   }
@@ -631,11 +660,11 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }
 
-  boolean prepareView(NetView view, Collection<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
+  boolean prepareView(NetView view, List<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
     return sendView(view, newMembers, true, this.prepareProcessor, requests);
   }
 
-  void sendView(NetView view, Collection<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
+  void sendView(NetView view, List<InternalDistributedMember> newMembers, List<DistributionMessage> requests) {
     sendView(view, newMembers, false, this.viewProcessor, requests);
   }
 
@@ -669,7 +698,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }    
   }
   
-  boolean sendView(NetView view, Collection<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp,
+  boolean sendView(NetView view, List<InternalDistributedMember> newMembers, boolean preparing, ViewReplyProcessor rp,
       List<DistributionMessage> requests) {
     int id = view.getViewId();
     InstallViewMessage msg = new InstallViewMessage(view, services.getAuthenticator().getCredentials(this.localAddress), preparing);
@@ -718,6 +747,14 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
     // only wait for responses during preparation
     if (preparing) {
+      // send join responses after other members at least have
+      // a prepared view announcing the new member
+      if (!(isNetworkPartition(view) && quorumRequired)) {
+        List<Integer> newPorts = new ArrayList<Integer>(view.size());
+        addPorts(view, requests, newPorts);
+        sendJoinResponses(newMembers, view, portsForMembers);
+      }
+
       logger.debug("waiting for view responses");
 
       Set<InternalDistributedMember> failedToRespond = rp.waitForResponses();
@@ -817,7 +854,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     
     long giveUpTime = System.currentTimeMillis() + services.getConfig().getLocatorWaitTime() * 1000;
     
-    int connectTimeout = (int)services.getConfig().getMemberTimeout();
+    int connectTimeout = (int)services.getConfig().getMemberTimeout() * 2;
     boolean anyResponses = false;
     boolean flagsSet = false;
 
@@ -835,7 +872,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           FindCoordinatorResponse response = (o instanceof FindCoordinatorResponse) ? (FindCoordinatorResponse)o : null;
           if (response != null) {
             state.locatorsContacted++;
-            if (response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
+            if (!state.hasContactedAJoinedLocator &&
+                response.getSenderId() != null && response.getSenderId().getVmViewId() >= 0) {
               logger.debug("Locator's address indicates it is part of a distributed system "
                   + "so I will not become membership coordinator on this attempt to join");
               state.hasContactedAJoinedLocator = true;
@@ -845,9 +883,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
               NetView v = response.getView();
               int viewId = v == null? -1 : v.getViewId();
               if (viewId > state.viewId) {
-                // if the view has changed it is possible that a member
-                // that we already tried to join with will become coordinator
-                state.alreadyTried.clear();
                 state.viewId = viewId;
                 state.view = v;
                 state.registrants.clear();
@@ -1192,6 +1227,10 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       viewCreator.shutdown();
     }
   }
+  
+  private void startViewBroadcaster() {
+    services.getTimer().schedule(new ViewBroadcaster(), VIEW_BROADCAST_INTERVAL, VIEW_BROADCAST_INTERVAL);
+  }
 
   public static void loadEmergencyClasses() {
   }
@@ -1548,6 +1587,43 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     }
   }
 
+  /**
+   * ViewBroadcaster periodically sends the current view to all
+   * current and departed members.  This ensures that a member that
+   * missed the view will eventually see it and act on it.
+   */
+  class ViewBroadcaster extends TimerTask {
+
+    @Override
+    public void run() {
+      if (!isCoordinator || isStopping) {
+        cancel();
+      } else {
+        sendCurrentView();
+      }
+    }
+    
+    void sendCurrentView() {
+      NetView v = currentView;
+      if (v != null) {
+        InstallViewMessage msg = new InstallViewMessage(v, services.getAuthenticator().getCredentials(localAddress));
+        Collection<InternalDistributedMember> recips = new ArrayList<>(v.size() + v.getCrashedMembers().size());
+        recips.addAll(v.getMembers());
+        recips.addAll(v.getCrashedMembers());
+        List<Integer> ports = new ArrayList<>(v.size());
+        for (InternalDistributedMember mbr: v.getMembers()) {
+          InetSocketAddress addr = services.getHealthMonitor().getSocketInfo().get(mbr);
+          int port = addr==null? -1 : addr.getPort();
+          ports.add(Integer.valueOf(port));
+        }
+        msg.setPortsForMembers(ports);
+        msg.setRecipients(recips);
+        services.getMessenger().send(msg);
+      }
+    }
+    
+  }
+
   class ViewCreator extends Thread {
     boolean shutdown = false;
     volatile boolean waiting = false;
@@ -1594,7 +1670,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
     private void sendInitialView() {
       if (initialView != null) {
         try {
-          prepareAndSendView(initialView, Collections.<InternalDistributedMember> emptyList(), initialLeaving, initialRemovals, null);
+          prepareAndSendView(initialView, Collections.<InternalDistributedMember>emptyList(), initialLeaving,
+              initialRemovals, Collections.<DistributionMessage>emptyList());
         } finally {
           this.initialView = null;
           this.initialLeaving = null;
@@ -1608,7 +1685,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       List<DistributionMessage> requests = null;
       logger.info("View Creator thread is starting");
       sendInitialView();
-      long okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
+      long okayToCreateView = System.currentTimeMillis() + requestCollectionInterval;
       try {
         for (;;) {
           synchronized (viewRequests) {
@@ -1629,7 +1706,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
                 // start the timer when we have only one request because
                 // concurrent startup / shutdown of multiple members is
                 // a common occurrence
-                okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
+                okayToCreateView = System.currentTimeMillis() + requestCollectionInterval;
                 continue;
               }
             } else {
@@ -1649,7 +1726,7 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
                   requests.addAll(viewRequests);
                 }
                 viewRequests.clear();
-                okayToCreateView = System.currentTimeMillis() + MEMBER_REQUEST_COLLECTION_INTERVAL;
+                okayToCreateView = System.currentTimeMillis() + requestCollectionInterval;
               }
             }
           } // synchronized
@@ -1713,14 +1790,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
           }
           break;
         case REMOVE_MEMBER_REQUEST:
-          mbr = ((RemoveMemberMessage) msg).getMemberID();
-          if (oldMembers.contains(mbr) && !leaveReqs.contains(mbr) && !removalReqs.contains(mbr)) {
-            removalReqs.add(mbr);
-            removalReasons.add(((RemoveMemberMessage) msg).getReason());
-          } else {
-            sendRemoveMessages(Collections.<InternalDistributedMember> singletonList(mbr),
-                Collections.<String> singletonList(((RemoveMemberMessage) msg).getReason()), currentView);
-          }
+          // process these after gathring all leave-requests so that
+          // we don't kick out a member that's shutting down
           break;
         default:
           logger.warn("Unknown membership request encountered: {}", msg);
@@ -1728,6 +1799,25 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         }
       }
 
+      for (DistributionMessage msg : requests) {
+        switch (msg.getDSFID()) {
+        case REMOVE_MEMBER_REQUEST:
+          InternalDistributedMember mbr = ((RemoveMemberMessage) msg).getMemberID();
+          if (!leaveReqs.contains(mbr)) {
+            if (oldMembers.contains(mbr) && !removalReqs.contains(mbr)) {
+              removalReqs.add(mbr);
+              removalReasons.add(((RemoveMemberMessage) msg).getReason());
+            } else {
+              sendRemoveMessages(Collections.<InternalDistributedMember> singletonList(mbr),
+                  Collections.<String> singletonList(((RemoveMemberMessage) msg).getReason()), currentView);
+            }
+          }
+          break;
+        default:
+          break;
+        }
+      }
+
       for (InternalDistributedMember mbr : oldIDs) {
         if (!leaveReqs.contains(mbr) && !removalReqs.contains(mbr)) {
           removalReqs.add(mbr);
@@ -1770,16 +1860,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       // getting messages from members that have been kicked out
       sendRemoveMessages(removalReqs, removalReasons, newView);
 
-      // we want to always check for quorum loss but don't act on it
-      // unless network-partition-detection is enabled
-      if (!(isNetworkPartition(newView) && quorumRequired)) {
-        // add socket ports of all members to join response
-        List<Integer> portsForMembers = new ArrayList<Integer>(newView.size());
-        addPorts(newView, requests, portsForMembers);
-        sendJoinResponses(joinReqs, newView, portsForMembers);
-      }
-
       prepareAndSendView(newView, joinReqs, leaveReqs, newView.getCrashedMembers(), requests);
+
       return;
     }
 
@@ -1812,6 +1894,8 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
         prepared = prepareView(newView, joinReqs, requests);
         logger.debug("view preparation phase completed.  prepared={}", prepared);
 
+        NetView conflictingView = prepareProcessor.getConflictingView();
+
         if (prepared) {
           break;
         }
@@ -1831,13 +1915,27 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
 
         List<InternalDistributedMember> failures = new ArrayList<>(currentView.getCrashedMembers().size() + unresponsive.size());
 
-        NetView conflictingView = prepareProcessor.getConflictingView();
         if (conflictingView != null && !conflictingView.getCreator().equals(localAddress) && conflictingView.getViewId() > newView.getViewId()
             && (lastConflictingView == null || conflictingView.getViewId() > lastConflictingView.getViewId())) {
           lastConflictingView = conflictingView;
           logger.info("adding these crashed members from a conflicting view to the crash-set for the next view: {}\nconflicting view: {}", unresponsive,
               conflictingView);
           failures.addAll(conflictingView.getCrashedMembers());
+          List<InternalDistributedMember> newMembers = prepareProcessor.getConflictingView().getNewMembers();
+          if (!newMembers.isEmpty()) {
+            logger.info("adding these new members from a conflicting view to the new view: {}", newMembers);
+            for (InternalDistributedMember mbr: newMembers) {
+              InetSocketAddress addr = services.getHealthMonitor().getSocketInfo().get(mbr);
+              // TODO: re-factor health monitor ports to be in the NetView so we don't need
+              // to create a fake JoinRequestMessage here
+              int port = addr==null? -1 : addr.getPort();
+              JoinRequestMessage msg = new JoinRequestMessage(localAddress, mbr, null);
+              msg.setSocketPort(port);
+              requests.add(msg);
+              newView.add(mbr);
+              joinReqs.add(mbr);
+            }
+          }
         }
 
         if (!unresponsive.isEmpty()) {
@@ -1977,5 +2075,6 @@ public class GMSJoinLeave implements JoinLeave, MessageHandler {
       }
     }
   }
-
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index ba941f8..ae7ee16 100755
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -118,14 +118,12 @@ public class JGroupsMessenger implements Messenger {
 
   private NetView view;
 
-  private View jgView;
-  
   private GMSPingPonger pingPonger = new GMSPingPonger();
   
   private volatile long pongsReceived;
   
-  private boolean playingDead;
-
+  private byte[] serializedNetMember;
+  
   
   static {
     // register classes that we've added to jgroups that are put on the wire
@@ -350,7 +348,6 @@ public class JGroupsMessenger implements Messenger {
     }
     ViewId vid = new ViewId(new JGAddress(v.getCoordinator()), v.getViewId());
     View jgv = new View(vid, new ArrayList<Address>(mbrs));
-    this.jgView = jgv;
     logger.trace("installing JGroups view: {}", jgv);
     this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));
   }
@@ -429,12 +426,10 @@ public class JGroupsMessenger implements Messenger {
 
   @Override
   public void playDead() {
-    playingDead = true;
   }
 
   @Override
   public void beHealthy() {
-    playingDead = false;
   }
 
   @Override
@@ -461,8 +456,17 @@ public class JGroupsMessenger implements Messenger {
   }
 
   @Override
+  public Set<InternalDistributedMember> sendUnreliably(DistributionMessage msg) {
+    return send(msg, false);
+  }
+    
+  @Override
   public Set<InternalDistributedMember> send(DistributionMessage msg) {
+    return send(msg, true);
+  }
     
+  public Set<InternalDistributedMember> send(DistributionMessage msg, boolean reliably) {
+      
     // perform the same jgroups messaging as in 8.2's GMSMembershipManager.send() method
 
     // BUT: when marshalling messages we need to include the version of the product and
@@ -505,6 +509,9 @@ public class JGroupsMessenger implements Messenger {
         long startSer = theStats.startMsgSerialization();
         Message jmsg = createJGMessage(msg, local, Version.CURRENT_ORDINAL);
         jmsg.setTransientFlag(TransientFlag.DONT_LOOPBACK);
+        if (!reliably) {
+          jmsg.setFlag(Message.Flag.NO_RELIABILITY);
+        }
         theStats.endMsgSerialization(startSer);
         theStats.incSentBytes(jmsg.getLength());
         logger.trace("Sending JGroups message: {}", jmsg);
@@ -590,6 +597,9 @@ public class JGroupsMessenger implements Messenger {
           Exception problem = null;
           try {
             Message tmp = (i < (calculatedLen-1)) ? jmsg.copy(true) : jmsg;
+            if (!reliably) {
+              jmsg.setFlag(Message.Flag.NO_RELIABILITY);
+            }
             tmp.setDest(to);
             tmp.setSrc(this.jgAddress);
             logger.trace("Unicasting to {}", to);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
index fea45ce..15a64d7 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/util/PluckStacks.java
@@ -206,7 +206,7 @@ public class PluckStacks {
       return (stackSize == 8 && thread.get(2).contains("SocketChannelImpl.accept"));
     }
     if (threadName.startsWith("P2P message reader")) {
-      return (stackSize == 11 && 
+      return (stackSize <= 12 && 
         (thread.getFirstFrame().contains("FileDispatcherImpl.read") ||
          thread.getFirstFrame().contains("FileDispatcher.read") ||
          thread.getFirstFrame().contains("SocketDispatcher.read")));
@@ -231,6 +231,9 @@ public class PluckStacks {
           && (stackSize > 6 && thread.get(6).contains("fetchHeader"))) return true; // reading from a client
       return isIdleExecutor(thread);
     }
+    if (threadName.startsWith("TCP Check ServerSocket Thread")) {
+      return (stackSize >= 3 && thread.get(2).contains("socketAccept"));
+    }
     if (threadName.startsWith("Timer runner")) {
 //      System.out.println("timer runner stack size = " + stackSize + "; frame = " + thread.get(1));
       return (stackSize <= 10 && thread.get(1).contains("TIMED_WAITING"));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/14445c08/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
index 41b0df7..4e32932 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/membership/gms/membership/GMSJoinLeaveJUnitTest.java
@@ -7,6 +7,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -14,6 +15,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Timer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -72,6 +74,7 @@ public class GMSJoinLeaveJUnitTest {
     when(mockConfig.getDistributionConfig()).thenReturn(mockDistConfig);
     when(mockDistConfig.getLocators()).thenReturn("localhost[12345]");
     when(mockDistConfig.getMcastPort()).thenReturn(0);
+    when(mockDistConfig.getMemberTimeout()).thenReturn(2000);
     
     authenticator = mock(Authenticator.class);
     gmsJoinLeaveMemberId = new InternalDistributedMember("localhost", 8887);
@@ -94,6 +97,9 @@ public class GMSJoinLeaveJUnitTest {
     when(services.getManager()).thenReturn(manager);
     when(services.getHealthMonitor()).thenReturn(healthMonitor);
     
+    Timer t = new Timer(true);
+    when(services.getTimer()).thenReturn(t);
+    
     mockMembers = new InternalDistributedMember[4];
     for (int i = 0; i < mockMembers.length; i++) {
       mockMembers[i] = new InternalDistributedMember("localhost", 8888 + i);
@@ -193,6 +199,7 @@ public class GMSJoinLeaveJUnitTest {
     Set<InternalDistributedMember> shutdowns = new HashSet<>();
     Set<InternalDistributedMember> crashes = new HashSet<>();
     mbrs.add(mockMembers[0]);
+    mbrs.add(gmsJoinLeaveMemberId);
     
     when(services.getMessenger()).thenReturn(messenger);
     
@@ -219,6 +226,42 @@ public class GMSJoinLeaveJUnitTest {
     assertTrue(removeMessageSent.methodExecuted);
   }
   
+  @Test
+  public void testRemoveAndLeaveIsNotACrash() throws Exception {
+    // simultaneous leave & remove requests for a member
+    // should not result in it's being seen as a crashed member
+    initMocks();
+
+    when(healthMonitor.checkIfAvailable(any(InternalDistributedMember.class),
+        any(String.class), any(Boolean.class))).thenReturn(true);
+    
+    gmsJoinLeave.delayViewCreationForTest(5000); // ensures multiple requests are queued for a view change
+    gmsJoinLeave.becomeCoordinatorForTest();
+
+    NetView oldView = null;
+    long giveup = System.currentTimeMillis() + 10000;
+    while (System.currentTimeMillis() < giveup  &&  oldView == null) {
+      Thread.sleep(500);
+      oldView = gmsJoinLeave.getView();
+    }
+    assertTrue(oldView != null);  // it should have become coordinator and installed a view
+    
+    NetView newView = new NetView(oldView, oldView.getViewId()+1);
+    newView.add(mockMembers[1]);
+    newView.add(mockMembers[2]);
+    gmsJoinLeave.installView(newView);
+    
+    gmsJoinLeave.memberShutdown(mockMembers[1], "shutting down for test");
+    gmsJoinLeave.remove(mockMembers[1], "removing for test");
+    
+    giveup = System.currentTimeMillis() + 10000;
+    while (System.currentTimeMillis() < giveup  &&  gmsJoinLeave.getView().getViewId() == newView.getViewId()) {
+      Thread.sleep(500);
+    }
+    assertTrue(gmsJoinLeave.getView().getViewId() > newView.getViewId());
+    assertFalse(gmsJoinLeave.getView().getCrashedMembers().contains(mockMembers[1]));
+  }
+  
   
   @Test 
   public void testRejectOlderView() throws IOException {
@@ -422,6 +465,17 @@ public class GMSJoinLeaveJUnitTest {
     gmsJoinLeave.processMessage(msg);
     assertTrue("Expected leave request from non-member to be ignored", gmsJoinLeave.getViewRequests().isEmpty());
   }
+  
+  @Test
+  public void testBecomeCoordinatorOnStartup() throws Exception {
+    initMocks();
+    gmsJoinLeave.becomeCoordinatorForTest();
+    long giveup = System.currentTimeMillis() + 20000;
+    while (System.currentTimeMillis() < giveup && !gmsJoinLeave.isCoordinator()) {
+      Thread.sleep(1000);
+    }
+    assertTrue(gmsJoinLeave.isCoordinator());
+  }
 
   @Test
   public void testBecomeCoordinator() throws Exception {
@@ -573,6 +627,33 @@ public class GMSJoinLeaveJUnitTest {
     assertTrue(gmsJoinLeave.getPreparedView().equals(newView));
   }
   
+  @Test
+  public void testNoViewAckCausesRemovalMessage() throws Exception {
+    initMocks(true);
+    when(healthMonitor.checkIfAvailable(any(InternalDistributedMember.class),
+        any(String.class), any(Boolean.class))).thenReturn(false);
+    prepareAndInstallView();
+    NetView oldView = gmsJoinLeave.getView();
+    NetView newView = new NetView(oldView, oldView.getViewId()+1);
+    
+    // the new view will remove the old coordinator (normal shutdown) and add a new member
+    // who will not ack the view.  This should cause it to be removed from the system
+    // with a RemoveMemberMessage
+    newView.add(mockMembers[2]);
+    newView.remove(mockMembers[0]);
+    
+    InstallViewMessage installViewMessage = new InstallViewMessage(newView, credentials, false);
+    gmsJoinLeave.processMessage(installViewMessage);
+    
+    long giveup = System.currentTimeMillis() + (2000 * 3); // this test's member-timeout * 3
+    while (System.currentTimeMillis() < giveup
+        && gmsJoinLeave.getView().getViewId() == oldView.getViewId()) {
+      Thread.sleep(1000);
+    }
+    assertTrue(gmsJoinLeave.isCoordinator());
+    verify(messenger, times(2)).send(any(RemoveMemberMessage.class));
+  }
+  
   
 }