You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ud...@apache.org on 2016/10/12 00:54:49 UTC

incubator-geode git commit: GEODE-1874: Changed setNextNeighbor to not create HashMap for every p2p invocation

Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1874 [created] edaa462eb


GEODE-1874: Changed setNextNeighbor to not create HashMap for every p2p invocation


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/edaa462e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/edaa462e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/edaa462e

Branch: refs/heads/feature/GEODE-1874
Commit: edaa462eb41c9380d9fd4eca889ccc269fcdbdf3
Parents: 280d2d8
Author: Udo Kohlmeyer <uk...@pivotal.io>
Authored: Wed Oct 12 11:54:33 2016 +1100
Committer: Udo Kohlmeyer <uk...@pivotal.io>
Committed: Wed Oct 12 11:54:33 2016 +1100

----------------------------------------------------------------------
 .../membership/gms/fd/GMSHealthMonitor.java     | 394 ++++++++++---------
 1 file changed, 210 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/edaa462e/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index aafb498..97a413c 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -16,9 +16,7 @@
  */
 package org.apache.geode.distributed.internal.membership.gms.fd;
 
-import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_REQUEST;
-import static org.apache.geode.internal.DataSerializableFixedID.HEARTBEAT_RESPONSE;
-import static org.apache.geode.internal.DataSerializableFixedID.SUSPECT_MEMBERS_MESSAGE;
+import static org.apache.geode.internal.DataSerializableFixedID.*;
 
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -29,7 +27,19 @@ import java.net.InetAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -40,7 +50,7 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.*;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.Logger;
 import org.jgroups.util.UUID;
@@ -69,7 +79,7 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel;
 
 /**
  * Failure Detection
- * 
+ * <p>
  * This class make sure that each member is alive and communicating to this member.
  * To make sure that we create the ring of members based on current view. On this
  * ring, each member make sure that next-member in ring is communicating with it.
@@ -77,17 +87,16 @@ import org.apache.geode.internal.security.SecurableCommunicationChannel;
  * member has not communicated in last period(member-timeout) then we check whether
  * this member is still alive or not. Based on that we informed probable coordinators
  * to remove that member from view.
- * 
+ * <p>
  * It has {@link #suspect(InternalDistributedMember, String)} api, which can be used
  * to initiate suspect processing for any member. First is checks whether the member is
  * responding or not. Then it informs probable coordinators to remove that member from
  * view.
- * 
+ * <p>
  * It has {@link #checkIfAvailable(DistributedMember, String, boolean)} api to see
  * if that member is alive. Then based on removal flag it initiates the suspect processing
  * for that member.
- * 
- * */
+ */
 @SuppressWarnings({ "SynchronizationOnLocalVariableOrMethodParameter", "NullableProblems" })
 public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
@@ -99,9 +108,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   volatile private boolean isStopping = false;
   private final AtomicInteger requestId = new AtomicInteger();
 
-  /** membership logger */
+  /**
+   * membership logger
+   */
   private static final Logger logger = Services.getLogger();
-  
+
   /**
    * The number of recipients of periodic heartbeats.  The recipients will
    * be selected from the members that are likely to be monitoring this member.
@@ -115,24 +126,28 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    */
   public static final int LOGICAL_INTERVAL = Integer.getInteger("geode.logical-message-received-interval", 2);
 
-  /** stall time to wait for members leaving concurrently */
+  /**
+   * stall time to wait for members leaving concurrently
+   */
   public static final long MEMBER_SUSPECT_COLLECTION_INTERVAL = Long.getLong("geode.suspect-member-collection-interval", 200);
 
   private volatile long currentTimeStamp;
-  
-  /** this member's ID */
+
+  /**
+   * this member's ID
+   */
   private InternalDistributedMember localAddress;
 
   /**
    * Timestamp at which we last had contact from a member
    */
   final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps = new ConcurrentHashMap<>();
-  
+
   /**
    * Members currently being suspected and the view they were suspected in
    */
   final private ConcurrentHashMap<InternalDistributedMember, NetView> suspectedMemberInView = new ConcurrentHashMap<>();
-  
+
   /**
    * Members undergoing final checks
    */
@@ -142,7 +157,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * Replies to messages
    */
   final private Map<Integer, Response> requestIdVsResponse = new ConcurrentHashMap<>();
-  
+
   /**
    * Members suspected in a particular view
    */
@@ -156,29 +171,36 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * to stop check scheduler
    */
   private ScheduledFuture<?> monitorFuture;
-  
-  /** test hook */
+
+  /**
+   * test hook
+   */
   private volatile boolean playingDead = false;
 
-  /** test hook */
+  /**
+   * test hook
+   */
   private volatile boolean beingSick = false;
-  
+
   // For TCP check
   private ExecutorService serverSocketExecutor;
   static final int OK = 0x7B;
-  static final int ERROR = 0x00;  
+  static final int ERROR = 0x00;
   private volatile int socketPort;
   private volatile ServerSocket serverSocket;
-  
-  /** Statistics about health monitor */
+
+  /**
+   * Statistics about health monitor
+   */
   private DMStats stats;
 
   /**
    * this class is to avoid garbage
    */
   private static class TimeStamp {
+
     private volatile long timeStamp;
-    
+
     TimeStamp(long timeStamp) {
       this.timeStamp = timeStamp;
     }
@@ -196,11 +218,12 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * This class sets start interval timestamp to record the activity of all members.
    * That is used by {@link GMSHealthMonitor#contactedBy(InternalDistributedMember)} to
    * record the activity of member.
-   * 
+   *
    * It initiates the suspect processing for next neighbour if it doesn't see any activity from that
    * member in last interval(member-timeout)
    */
   private class Monitor implements Runnable {
+
     final long memberTimeoutInMillis;
 
     public Monitor(long memberTimeout) {
@@ -213,25 +236,25 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       if (GMSHealthMonitor.this.isStopping) {
         return;
       }
-      
+
       InternalDistributedMember neighbour = nextNeighbor;
-      
+
       long currentTime = System.currentTimeMillis();
       //this is the start of interval to record member activity
       GMSHealthMonitor.this.currentTimeStamp = currentTime;
 
       if (neighbour != null) {
         TimeStamp nextNeighborTS;
-        synchronized(GMSHealthMonitor.this) {
+        synchronized (GMSHealthMonitor.this) {
           nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
         }
-        
+
         if (nextNeighborTS == null) {
           TimeStamp customTS = new TimeStamp(currentTime);
           memberTimeStamps.put(neighbour, customTS);
           return;
         }
-        
+
         long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
         long lastTS = currentTime - nextNeighborTS.getTime();
         if (lastTS + interval >= memberTimeoutInMillis) {
@@ -249,6 +272,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * notify waiting thread.
    */
   private class Response {
+
     private DistributionMessage responseMsg;
 
     public DistributionMessage getResponseMsg() {
@@ -276,7 +300,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         OutputStream out = socket.getOutputStream();
         @SuppressWarnings("UnusedAssignment")
         short version = in.readShort();
-        int  vmViewId = in.readInt();
+        int vmViewId = in.readInt();
         long uuidLSBs = in.readLong();
         long uuidMSBs = in.readLong();
         GMSHealthMonitor.this.stats.incFinalCheckRequestsReceived();
@@ -288,9 +312,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         int myVmViewId = gmbr.getVmViewId();
         if (playingDead) {
           logger.debug("HealthMonitor: simulating sick member in health check");
-        } else if (uuidLSBs == myUUID.getLeastSignificantBits()
-                   && uuidMSBs == myUUID.getMostSignificantBits()
-                   && vmViewId == myVmViewId) {
+        } else if (uuidLSBs == myUUID.getLeastSignificantBits() && uuidMSBs == myUUID.getMostSignificantBits() && vmViewId == myVmViewId) {
           logger.debug("HealthMonitor: sending OK reply");
           out.write(OK);
           out.flush();
@@ -300,12 +322,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           logger.debug("HealthMonitor: server replied OK.");
         } else {
           if (logger.isDebugEnabled()) {
-            logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}.  My viewID is {} received is {}", 
-              Long.toHexString(myUUID.getMostSignificantBits()),
-              Long.toHexString(myUUID.getLeastSignificantBits()), 
-              Long.toHexString(uuidMSBs),
-              Long.toHexString(uuidLSBs),
-              myVmViewId, vmViewId);
+            logger.debug("HealthMonitor: sending ERROR reply - my UUID is {},{} received is {},{}.  My viewID is {} received is {}", Long.toHexString(myUUID.getMostSignificantBits()), Long.toHexString(myUUID.getLeastSignificantBits()), Long.toHexString(uuidMSBs), Long.toHexString(uuidLSBs), myVmViewId, vmViewId);
           }
           out.write(ERROR);
           out.flush();
@@ -350,13 +367,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   public void contactedBy(InternalDistributedMember sender) {
     contactedBy(sender, currentTimeStamp);
   }
-  
-  
+
+
   /**
    * Record member activity at a specified time
    */
   private void contactedBy(InternalDistributedMember sender, long timeStamp) {
     TimeStamp cTS = new TimeStamp(timeStamp);
+    //TODO Udo: why putIfAbsent. Surely only put is required
     cTS = memberTimeStamps.putIfAbsent(sender, cTS);
     if (cTS != null && cTS.getTime() < timeStamp) {
       cTS.setTime(timeStamp);
@@ -367,7 +385,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     setNextNeighbor(currentView, null);
   }
 
-  
+
   private HeartbeatRequestMessage constructHeartbeatRequestMessage(final InternalDistributedMember mbr) {
     final int reqId = requestId.getAndIncrement();
     final HeartbeatRequestMessage hrm = new HeartbeatRequestMessage(mbr, reqId);
@@ -390,7 +408,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       } catch (CancelException e) {
         return;
       }
-        
+
       if (!pinged) {
         suspectedMemberInView.put(mbr, currentView);
         String reason = "Member isn't responding to heartbeat requests";
@@ -442,7 +460,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       if (membersNotReceivedMsg != null && membersNotReceivedMsg.contains(member)) {
         // member is not part of current view.
         logger.trace("Member {} is not part of current view.", member);
-      } else if (waitForResponse){
+      } else if (waitForResponse) {
         synchronized (pingResp) {
           if (pingResp.getResponseMsg() == null) {
             pingResp.wait(memberTimeout);
@@ -470,7 +488,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     } catch (InterruptedException e) {
       logger.debug("GMSHealthMonitor checking thread interrupted, while waiting for response from member: {} .", member);
     } finally {
-      if(waitForResponse) {
+      if (waitForResponse) {
         requestIdVsResponse.remove(hrm.getRequestId());
       }
     }
@@ -480,8 +498,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   /**
    * During final check, establish TCP connection between current member and suspect member.
    * And exchange PING/PONG message to see if the suspect member is still alive.
-   * 
+   *
    * @param suspectMember member that does not respond to HeartbeatRequestMessage
+   *
    * @return true if successfully exchanged PING/PONG with TCP connection, otherwise false.
    */
   boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
@@ -489,21 +508,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     InternalDistributedSystem internalDistributedSystem = InternalDistributedSystem.getConnectedInstance();
     try {
       logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember, suspectMember.getInetAddress(), port);
-      clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(), port,
-          (int)memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
+      clientSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).connect(suspectMember.getInetAddress(), port, (int) memberTimeout, new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
       clientSocket.setTcpNoDelay(true);
       return doTCPCheckMember(suspectMember, clientSocket);
-    }
-    catch (IOException e) {
+    } catch (IOException e) {
       // this is expected if it is a connection-timeout or other failure
       // to connect
-    }
-    catch (IllegalStateException e) {
+    } catch (IllegalStateException e) {
       if (!isStopping) {
         logger.trace("Unexpected exception", e);
       }
-    }
-    finally {
+    } finally {
       try {
         if (clientSocket != null) {
           clientSocket.setSoLinger(true, 0); // abort the connection
@@ -554,10 +569,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       return false;
     } catch (IOException e) {
       logger.trace("Unexpected exception", e);
-    } 
+    }
     return false;
   }
-  
+
   void writeMemberToStream(GMSMember gmbr, DataOutputStream out) throws IOException {
     out.writeShort(Version.CURRENT_ORDINAL);
     out.writeInt(gmbr.getVmViewId());
@@ -565,24 +580,24 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     out.writeLong(gmbr.getUuidMSBs());
     out.flush();
   }
-  
+
   @Override
   public void suspect(InternalDistributedMember mbr, String reason) {
     initiateSuspicion(mbr, reason);
     // Background suspect-collecting thread is currently disabled - it takes too long
-//    synchronized (suspectRequests) {
-//      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
-//      if (!suspectRequests.contains(sr)) {
-//        logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
-//        suspectRequests.add(sr);
-//        suspectRequests.notify();
-//      }
-//    }
+    //    synchronized (suspectRequests) {
+    //      SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
+    //      if (!suspectRequests.contains(sr)) {
+    //        logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
+    //        suspectRequests.add(sr);
+    //        suspectRequests.notify();
+    //      }
+    //    }
   }
 
   @Override
   public boolean checkIfAvailable(DistributedMember mbr, String reason, boolean initiateRemoval) {
-    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember)mbr, reason);
+    return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval, (InternalDistributedMember) mbr, reason);
   }
 
   public void start() {
@@ -607,17 +622,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     long delay = memberTimeout / LOGICAL_INTERVAL;
     monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
 
-//    suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests,
-//        new Callback<SuspectRequest>() {
-//      @Override
-//      public void process(List<SuspectRequest> requests) {
-//        GMSHealthMonitor.this.sendSuspectRequest(requests);
-//
-//      }
-//    }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
-//    suspectRequestCollectorThread.setDaemon(true);
-//    suspectRequestCollectorThread.start()
-    
+    //    suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect Message Collector", Services.getThreadGroup(), suspectRequests,
+    //        new Callback<SuspectRequest>() {
+    //      @Override
+    //      public void process(List<SuspectRequest> requests) {
+    //        GMSHealthMonitor.this.sendSuspectRequest(requests);
+    //
+    //      }
+    //    }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
+    //    suspectRequestCollectorThread.setDaemon(true);
+    //    suspectRequestCollectorThread.start()
+
     serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
       final AtomicInteger threadIdx = new AtomicInteger();
 
@@ -635,15 +650,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   ServerSocket createServerSocket(InetAddress socketAddress, int[] portRange) {
     ServerSocket serverSocket;
     try {
-      serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress, 50/*backlog*/,
-        true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false);
+      serverSocket = SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER).createServerSocketUsingPortRange(socketAddress, 50/*backlog*/, true/*isBindAddress*/, false/*useNIO*/, 65536/*tcpBufferSize*/, portRange, false);
       socketPort = serverSocket.getLocalPort();
     } catch (IOException | SystemConnectException e) {
       throw new GemFireConfigException("Unable to allocate a failure detection port in the membership-port range", e);
     }
     return serverSocket;
   }
-  
+
   /**
    * start the thread that listens for tcp/ip connections and responds
    * to connection attempts
@@ -656,15 +670,14 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       logger.info("Started failure detection server thread on {}:{}.", ssocket.getInetAddress(), socketPort);
       Socket socket = null;
       try {
-        while (!services.getCancelCriterion().isCancelInProgress() 
-            && !GMSHealthMonitor.this.isStopping) {
+        while (!services.getCancelCriterion().isCancelInProgress() && !GMSHealthMonitor.this.isStopping) {
           try {
             socket = ssocket.accept();
             if (GMSHealthMonitor.this.playingDead) {
               continue;
             }
             serverSocketExecutor.execute(new ClientSocketHandler(socket)); //start();  [bruce] I'm seeing a lot of failures due to this thread not being created fast enough, sometimes as long as 30 seconds
-          
+
           } catch (RejectedExecutionException e) {
             // this can happen during shutdown
 
@@ -696,7 +709,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       }
     });
   }
-  
+
   /**
    * start the thread that periodically sends a message to processes
    * that might be watching this process
@@ -707,10 +720,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         Thread.currentThread().setName("Geode Heartbeat Sender");
         sendPeriodicHeartbeats();
       }
+
       private void sendPeriodicHeartbeats() {
         while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
           try {
-            Thread.sleep(memberTimeout/LOGICAL_INTERVAL);
+            Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
           } catch (InterruptedException e) {
             return;
           }
@@ -727,7 +741,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           }
         }
       }
-      
+
       private void sendHeartbeats(List<InternalDistributedMember> mbrs, int startIndex) {
         InternalDistributedMember coordinator = currentView.getCoordinator();
         if (coordinator != null && !coordinator.equals(localAddress)) {
@@ -746,10 +760,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
         int index = startIndex;
         int numSent = 0;
-        for (;;) {
+        for (; ; ) {
           index--;
           if (index < 0) {
-            index = mbrs.size()-1;
+            index = mbrs.size() - 1;
           }
           InternalDistributedMember mbr = mbrs.get(index);
           if (mbr.equals(localAddress)) {
@@ -782,32 +796,32 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     synchronized (viewVsSuspectedMembers) {
       viewVsSuspectedMembers.clear();
     }
-    for (Iterator<InternalDistributedMember> it=memberTimeStamps.keySet().iterator(); it.hasNext(); ) {
+    for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it.hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-    for (Iterator<InternalDistributedMember> it=suspectedMemberInView.keySet().iterator(); it.hasNext(); ) {
+    for (Iterator<InternalDistributedMember> it = suspectedMemberInView.keySet().iterator(); it.hasNext(); ) {
       if (!newView.contains(it.next())) {
         it.remove();
       }
     }
-//    for (InternalDistributedMember mbr: newView.getMembers()) {
-//      if (!memberVsLastMsgTS.containsKey(mbr)) {
-//        CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
-//        memberVsLastMsgTS.put(mbr, customTS);
-//      }
-//    }
+    //    for (InternalDistributedMember mbr: newView.getMembers()) {
+    //      if (!memberVsLastMsgTS.containsKey(mbr)) {
+    //        CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
+    //        memberVsLastMsgTS.put(mbr, customTS);
+    //      }
+    //    }
     currentView = newView;
     setNextNeighbor(newView, null);
   }
 
   /***
    * This method sets next neighbour which it needs to watch in current view.
-   * 
+   *
    * if nextTo == null
    * then it watches member next to it.
-   * 
+   *
    * It becomes null when we suspect current neighbour, during that time it watches
    * member next to suspect member.
    */
@@ -820,16 +834,34 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     }
 
     List<InternalDistributedMember> allMembers = newView.getMembers();
-    
-    Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
-    checkAllSuspected.removeAll(suspectedMemberInView.keySet());
-    checkAllSuspected.remove(localAddress);
-    if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
-      logger.info("All other members are suspect at this point");
-      nextNeighbor = null;
-      return;
+
+    //    Set<InternalDistributedMember> checkAllSuspected = new HashSet<>(allMembers);
+    //    checkAllSuspected.removeAll(suspectedMemberInView.keySet());
+    //    checkAllSuspected.remove(localAddress);
+    //    if (checkAllSuspected.isEmpty() && allMembers.size() > 1) {
+    //      logger.info("All other members are suspect at this point");
+    //      nextNeighbor = null;
+    //      return;
+    //    }
+
+    if (allMembers.size() > 1 && suspectedMemberInView.size() >= allMembers.size() - 1) {
+      boolean nonSuspectFound = false;
+      for (InternalDistributedMember member : allMembers) {
+        if (member.equals(localAddress)) {
+          continue;
+        }
+        if (!suspectedMemberInView.containsKey(member)) {
+          nonSuspectFound = true;
+          break;
+        }
+      }
+      if (!nonSuspectFound) {
+        logger.info("All other members are suspect at this point");
+        nextNeighbor = null;
+        return;
+      }
     }
-    
+
     int index = allMembers.indexOf(nextTo);
     if (index != -1) {
       int nextNeighborIndex = (index + 1) % allMembers.size();
@@ -844,7 +876,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         nextNeighbor = newNeighbor;
       }
     }
-    
+
     if (nextNeighbor != null && nextNeighbor.equals(localAddress)) {
       nextNeighbor = null;
     }
@@ -866,10 +898,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     services.getMessenger().addHandler(HeartbeatMessage.class, this);
     services.getMessenger().addHandler(SuspectMembersMessage.class, this);
   }
-  
+
   @Override
   public void started() {
-    setLocalAddress( services.getMessenger().getMemberID());
+    setLocalAddress(services.getMessenger().getMemberID());
     serverSocket = createServerSocket(localAddress.getInetAddress(), services.getConfig().getMembershipPortRange());
     startTcpServer(serverSocket);
     startHeartbeatThread();
@@ -907,11 +939,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           serverSocket.close();
           serverSocket = null;
           logger.info("GMSHealthMonitor server socket is closed in stopServices().");
-        }
-        catch (IOException e) {
+        } catch (IOException e) {
           logger.trace("Unexpected exception", e);
         }
-      }      
+      }
       serverSocketExecutor.shutdownNow();
       try {
         serverSocketExecutor.awaitTermination(2000, TimeUnit.MILLISECONDS);
@@ -920,10 +951,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       }
       logger.info("GMSHealthMonitor serverSocketExecutor is " + (serverSocketExecutor.isTerminated() ? "terminated" : "not terminated"));
     }
-    
-//    if (suspectRequestCollectorThread != null) {
-//      suspectRequestCollectorThread.shutdown();
-//    }
+
+    //    if (suspectRequestCollectorThread != null) {
+    //      suspectRequestCollectorThread.shutdown();
+    //    }
   }
 
   /***
@@ -969,7 +1000,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   public void emergencyClose() {
     stopServices();
   }
-  
+
   void setLocalAddress(InternalDistributedMember idm) {
     this.localAddress = idm;
   }
@@ -983,44 +1014,44 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     logger.trace("processing {}", m);
 
     switch (m.getDSFID()) {
-    case HEARTBEAT_REQUEST:
-      if (beingSick || playingDead) {
-        logger.debug("sick member is ignoring check request");
-      } else {
-        processHeartbeatRequest((HeartbeatRequestMessage) m);
-      }
-      break;
-    case HEARTBEAT_RESPONSE:
-      if (beingSick || playingDead) {
-        logger.debug("sick member is ignoring check response");
-      } else {
-        processHeartbeat((HeartbeatMessage) m);
-      }
-      break;
-    case SUSPECT_MEMBERS_MESSAGE:
-      if (beingSick || playingDead) {
-        logger.debug("sick member is ignoring suspect message");
-      } else {
-        processSuspectMembersRequest((SuspectMembersMessage) m);
-      }
-      break;
-    default:
-      throw new IllegalArgumentException("unknown message type: " + m);
+      case HEARTBEAT_REQUEST:
+        if (beingSick || playingDead) {
+          logger.debug("sick member is ignoring check request");
+        } else {
+          processHeartbeatRequest((HeartbeatRequestMessage) m);
+        }
+        break;
+      case HEARTBEAT_RESPONSE:
+        if (beingSick || playingDead) {
+          logger.debug("sick member is ignoring check response");
+        } else {
+          processHeartbeat((HeartbeatMessage) m);
+        }
+        break;
+      case SUSPECT_MEMBERS_MESSAGE:
+        if (beingSick || playingDead) {
+          logger.debug("sick member is ignoring suspect message");
+        } else {
+          processSuspectMembersRequest((SuspectMembersMessage) m);
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("unknown message type: " + m);
     }
   }
 
   private void processHeartbeatRequest(HeartbeatRequestMessage m) {
-    
+
     this.stats.incHeartbeatRequestsReceived();
-    
+
     if (this.isStopping || this.playingDead) {
       return;
     }
-    
+
     // only respond if the intended recipient is this member
     InternalDistributedMember me = localAddress;
 
-    if (me.getVmViewId() >= 0  &&  m.getTarget().equals(me)) {
+    if (me.getVmViewId() >= 0 && m.getTarget().equals(me)) {
       HeartbeatMessage hm = new HeartbeatMessage(m.getRequestId());
       hm.setRecipient(m.getSender());
       Set<InternalDistributedMember> membersNotReceivedMsg = services.getMessenger().send(hm);
@@ -1059,9 +1090,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * for that member
    */
   private void processSuspectMembersRequest(SuspectMembersMessage incomingRequest) {
-    
+
     this.stats.incSuspectsReceived();
-    
+
     NetView cv = currentView;
 
     if (cv == null) {
@@ -1096,11 +1127,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       }
     }
 
-    
+
     if (cv.getCoordinator().equals(localAddress)) {
-      for (SuspectRequest req: incomingRequest.getMembers()) {
-        logger.info("received suspect message from {} for {}: {}",
-           sender, req.getSuspectMember(), req.getReason());
+      for (SuspectRequest req : incomingRequest.getMembers()) {
+        logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason());
       }
       checkIfAvailable(sender, sMembers, cv);
     }// coordinator ends
@@ -1120,9 +1150,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       InternalDistributedMember coordinator = check.getCoordinator();
       if (coordinator != null && coordinator.equals(localAddress)) {
         // new coordinator
-        for (SuspectRequest req: incomingRequest.getMembers()) {
-          logger.info("received suspect message from {} for {}: {}",
-             sender, req.getSuspectMember(), req.getReason());
+        for (SuspectRequest req : incomingRequest.getMembers()) {
+          logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(), req.getReason());
         }
         checkIfAvailable(sender, smbr, cv);
       } else {
@@ -1145,7 +1174,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         viewVsMembers = new HashSet<>();
         viewVsSuspectedMembers.put(cv, viewVsMembers);
       }
-      for (SuspectRequest sr: sMembers) {       
+      for (SuspectRequest sr : sMembers) {
         viewVsMembers.add(sr);
       }
     }
@@ -1157,8 +1186,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * we attempt to connect to its socket and ask if it's the expected member.
    * Otherwise we send a heartbeat request and wait for a reply.
    */
-  private void checkIfAvailable(final InternalDistributedMember initiator,
-      List<SuspectRequest> sMembers, final NetView cv) {
+  private void checkIfAvailable(final InternalDistributedMember initiator, List<SuspectRequest> sMembers, final NetView cv) {
 
     for (final SuspectRequest sr : sMembers) {
       final InternalDistributedMember mbr = sr.getSuspectMember();
@@ -1198,10 +1226,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     }
   }
 
-  private boolean inlineCheckIfAvailable(
-      final InternalDistributedMember initiator, final NetView cv,
-      boolean initiateRemoval,
-      final InternalDistributedMember mbr, final String reason) {
+  private boolean inlineCheckIfAvailable(final InternalDistributedMember initiator, final NetView cv, boolean initiateRemoval, final InternalDistributedMember mbr, final String reason) {
 
     if (services.getJoinLeave().isMemberLeaving(mbr)) {
       return false;
@@ -1216,7 +1241,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       // for some reason we used to update the timestamp for the member
       // with the startTime, but we don't want to do that because it looks
       // like a heartbeat has been received
-  
+
       logger.info("Performing final check for suspect member {} reason={}", mbr, reason);
       boolean pinged;
       int port = cv.getFailureDetectionPort(mbr);
@@ -1239,7 +1264,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         doCheckMember(mbr, false);
         pinged = doTCPCheckMember(mbr, port);
       }
-  
+
       if (!pinged && !isStopping) {
         TimeStamp ts = memberTimeStamps.get(mbr);
         if (ts == null || ts.getTime() < startTime) {
@@ -1263,11 +1288,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     }
     return !failed;
   }
-    
+
   @Override
   public void memberShutdown(DistributedMember mbr, String reason) {
   }
-  
+
   @Override
   public int getFailureDetectionPort() {
     return this.socketPort;
@@ -1275,21 +1300,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   private void sendSuspectRequest(final List<SuspectRequest> requests) {
     // the background suspect-collector thread is currently disabled
-//    synchronized (suspectRequests) {
-//      if (suspectRequests.size() > 0) {
-//        for (SuspectRequest sr: suspectRequests) {
-//          if (!requests.contains(sr)) {
-//            requests.add(sr);
-//          }
-//        }
-//        suspectRequests.clear();
-//      }
-//    }
+    //    synchronized (suspectRequests) {
+    //      if (suspectRequests.size() > 0) {
+    //        for (SuspectRequest sr: suspectRequests) {
+    //          if (!requests.contains(sr)) {
+    //            requests.add(sr);
+    //          }
+    //        }
+    //        suspectRequests.clear();
+    //      }
+    //    }
     logger.debug("Sending suspect request for members {}", requests);
     List<InternalDistributedMember> recipients;
     if (currentView.size() > 4) {
       HashSet<InternalDistributedMember> filter = new HashSet<>();
-      for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e.hasMoreElements();) {
+      for (Enumeration<InternalDistributedMember> e = suspectedMemberInView.keys(); e.hasMoreElements(); ) {
         filter.add(e.nextElement());
       }
       filter.addAll(requests.stream().map(SuspectRequest::getSuspectMember).collect(Collectors.toList()));
@@ -1313,15 +1338,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
   private static class ConnectTimeoutTask extends TimerTask implements ConnectionWatcher {
+
     final Timer scheduler;
     Socket socket;
     final long timeout;
-    
+
     ConnectTimeoutTask(Timer scheduler, long timeout) {
       this.scheduler = scheduler;
       this.timeout = timeout;
     }
-    
+
     @Override
     public void beforeConnect(Socket socket) {
       this.socket = socket;
@@ -1332,7 +1358,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     public void afterConnect(Socket socket) {
       cancel();
     }
-    
+
     @Override
     public void run() {
       try {
@@ -1343,9 +1369,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         // ignored - nothing useful to do here
       }
     }
-    
+
   }
-  
+
   public DMStats getStats() {
     return this.stats;
   }