You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2018/08/30 15:54:11 UTC

[geode] branch develop updated: GEODE-3780 member is not considered suspect after failing final check

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9ee8cbf  GEODE-3780 member is not considered suspect after failing final check
9ee8cbf is described below

commit 9ee8cbfe3000df4db87a7388d0123aa40e42b7ec
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Thu Aug 30 08:52:19 2018 -0700

    GEODE-3780 member is not considered suspect after failing final check
    
    Ensure that a member is in the suspects collection after it fails a
    final check.  This allows processSuspectMessage to know if it should
    perform the duty of a membership-coordinator and initiate final
    checks based on Suspect messages.
    
    I've also done a little bit of refactoring in processSuspectMessage and
    have removed commented-out code.
    
    This closes #2380
---
 .../gms/fd/GMSHealthMonitorJUnitTest.java          | 16 +++++
 .../membership/gms/fd/GMSHealthMonitor.java        | 79 +++++++++-------------
 2 files changed, 48 insertions(+), 47 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 8c13813..6f4b86f 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -49,6 +49,7 @@ import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -145,6 +146,7 @@ public class GMSHealthMonitorJUnitTest {
     when(services.getCancelCriterion()).thenReturn(stopper);
     when(services.getManager()).thenReturn(manager);
     when(services.getStatistics()).thenReturn(new DistributionStats(system, statsId));
+    when(services.getTimer()).thenReturn(new Timer("Geode Membership Timer", true));
     when(stopper.isCancelInProgress()).thenReturn(false);
 
     if (mockMembers == null) {
@@ -624,6 +626,20 @@ public class GMSHealthMonitorJUnitTest {
 
 
   @Test
+  public void testFinalCheckFailureLeavesMemberAsSuspect() {
+    NetView v = installAView();
+
+    setFailureDetectionPorts(v);
+
+    InternalDistributedMember memberToCheck = gmsHealthMonitor.getNextNeighbor();
+    boolean available = gmsHealthMonitor.checkIfAvailable(memberToCheck, "Not responding", true);
+    assertFalse(available);
+    verify(joinLeave).remove(isA(InternalDistributedMember.class), isA(String.class));
+    assertTrue(gmsHealthMonitor.isSuspectMember(memberToCheck));
+  }
+
+
+  @Test
   public void testShutdown() {
 
     installAView();
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index f31a0c3..789f8d5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -263,11 +263,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
         long lastTS = currentTime - nextNeighborTS.getTime();
         if (lastTS + interval >= memberTimeoutInMillis) {
-          logger.trace("Checking member {} ", neighbour);
+          logger.debug("Checking member {} ", neighbour);
           // now do check request for this member;
           checkMember(neighbour);
         }
-        setNextNeighbor(currentView, null);
       }
     }
   }
@@ -412,7 +411,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
     // we need to check this member
     checkExecutor.execute(() -> {
-      boolean pinged = false;
+      boolean pinged;
       try {
         pinged = GMSHealthMonitor.this.doCheckMember(mbr, true);
       } catch (CancelException e) {
@@ -420,9 +419,10 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       }
 
       if (!pinged) {
-        suspectedMemberIds.put(mbr, currentView);
         String reason = "Member isn't responding to heartbeat requests";
+        memberSuspected(localAddress, mbr, reason);
         initiateSuspicion(mbr, reason);
+        setNextNeighbor(currentView, null);
       } else {
         logger.trace("Setting next neighbor as member {} has responded.", mbr);
         memberUnsuspected(mbr);
@@ -455,7 +455,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       return true;
     }
     long startTime = System.currentTimeMillis();
-    logger.trace("Checking member {}", member);
+    logger.debug("Requesting heartbeat from {}", member);
     final HeartbeatRequestMessage hrm = constructHeartbeatRequestMessage(member);
     Response pingResp = null;
     if (waitForResponse) {
@@ -483,7 +483,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
             if (isStopping) {
               return true;
             }
-            logger.trace("no heartbeat response received from {} and no recent activity", member);
+            logger.debug("no heartbeat response received from {} and no recent activity", member);
             return false;
           } else {
             logger.trace("received heartbeat from {}", member);
@@ -601,15 +601,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   @Override
   public void suspect(InternalDistributedMember mbr, String reason) {
     initiateSuspicion(mbr, reason);
-    // Background suspect-collecting thread is currently disabled - it takes too long
-    // synchronized (suspectRequests) {
-    // SuspectRequest sr = new SuspectRequest((InternalDistributedMember) mbr, reason);
-    // if (!suspectRequests.contains(sr)) {
-    // logger.info("Suspecting member {}. Reason= {}.", mbr, reason);
-    // suspectRequests.add(sr);
-    // suspectRequests.notify();
-    // }
-    // }
   }
 
   @Override
@@ -641,18 +632,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     long delay = memberTimeout / LOGICAL_INTERVAL;
     monitorFuture = scheduler.scheduleAtFixedRate(m, delay, delay, TimeUnit.MILLISECONDS);
 
-    // suspectRequestCollectorThread = this.new RequestCollector<SuspectRequest>("Geode Suspect
-    // Message Collector", Services.getThreadGroup(), suspectRequests,
-    // new Callback<SuspectRequest>() {
-    // @Override
-    // public void process(List<SuspectRequest> requests) {
-    // GMSHealthMonitor.this.sendSuspectRequest(requests);
-    //
-    // }
-    // }, MEMBER_SUSPECT_COLLECTION_INTERVAL);
-    // suspectRequestCollectorThread.setDaemon(true);
-    // suspectRequestCollectorThread.start()
-
     serverSocketExecutor = Executors.newCachedThreadPool(new ThreadFactory() {
       final AtomicInteger threadIdx = new AtomicInteger();
 
@@ -832,12 +811,6 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         it.remove();
       }
     }
-    // for (InternalDistributedMember mbr: newView.getMembers()) {
-    // if (!memberVsLastMsgTS.containsKey(mbr)) {
-    // CustomTimeStamp customTS = new CustomTimeStamp(System.currentTimeMillis());
-    // memberVsLastMsgTS.put(mbr, customTS);
-    // }
-    // }
     currentView = newView;
     setNextNeighbor(newView, null);
   }
@@ -907,7 +880,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
   }
 
-  /*** test method */
+  /** test method */
   public InternalDistributedMember getNextNeighbor() {
     return nextNeighbor;
   }
@@ -984,7 +957,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    */
   public boolean isShutdown() {
     return scheduler.isShutdown() && checkExecutor.isShutdown()
-        && serverSocketExecutor.isShutdown() /* && !suspectRequestCollectorThread.isAlive() */;
+        && serverSocketExecutor.isShutdown();
   }
 
   /**
@@ -1003,7 +976,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   public void memberSuspected(InternalDistributedMember initiator,
       InternalDistributedMember suspect, String reason) {
     synchronized (suspectRequestsInView) {
-      suspectedMemberIds.putIfAbsent(suspect, currentView);
+      suspectedMemberIds.put(suspect, currentView);
       Collection<SuspectRequest> requests = suspectRequestsInView.get(currentView);
       boolean found = false;
       if (requests == null) {
@@ -1190,39 +1163,48 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       }
     }
 
+    logger.debug("Processing suspect requests {}", suspectRequests);
     if (cv.getCoordinator().equals(localAddress)) {
-      for (SuspectRequest req : incomingRequest.getMembers()) {
-        logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
-            req.getReason());
-      }
+      // This process is the membership coordinator and should perform a final check
+      logSuspectRequests(incomingRequest, sender);
       checkIfAvailable(sender, suspectRequests, cv);
-    } // coordinator ends
-    else {
 
+    } else {
+      // Another process has raised suspicion - check to see if
+      // this process should become the membership coordinator if
+      // all current suspects are gone
       NetView check = new NetView(cv, cv.getViewId() + 1);
       ArrayList<SuspectRequest> membersToCheck = new ArrayList<>();
       synchronized (suspectRequestsInView) {
         recordSuspectRequests(suspectRequests, cv);
         Set<SuspectRequest> suspectsInView = suspectRequestsInView.get(cv);
+        logger.debug("Current suspects for view #{} are {}", cv.getViewId(), suspectsInView);
         for (final SuspectRequest sr : suspectsInView) {
           check.remove(sr.getSuspectMember());
           membersToCheck.add(sr);
         }
       }
+      logger.trace("Trial view with suspects removed is {}\nmy address is {}", check, localAddress);
 
       InternalDistributedMember coordinator = check.getCoordinator();
       if (coordinator != null && coordinator.equals(localAddress)) {
         // new coordinator
-        for (SuspectRequest req : incomingRequest.getMembers()) {
-          logger.info("received suspect message from {} for {}: {}", sender, req.getSuspectMember(),
-              req.getReason());
-        }
+        logSuspectRequests(incomingRequest, sender);
         checkIfAvailable(sender, membersToCheck, cv);
       }
     }
 
   }
 
+  private void logSuspectRequests(SuspectMembersMessage incomingRequest,
+      InternalDistributedMember sender) {
+    for (SuspectRequest req : incomingRequest.getMembers()) {
+      String who = sender.equals(localAddress) ? "myself" : sender.toString();
+      logger.info("received suspect message from {} for {}: {}", who, req.getSuspectMember(),
+          req.getReason());
+    }
+  }
+
   /***
    * This method make sure that records suspectRequest. We need to make sure this on preferred
    * coordinators, as elder coordinator might be in suspected list next.
@@ -1321,10 +1303,13 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       if (!pinged && !isStopping) {
         TimeStamp ts = memberTimeStamps.get(mbr);
         if (ts == null || ts.getTime() < startTime) {
-          logger.info("Final check failed - requesting removal of suspect member " + mbr);
+          logger.info("Final check failed for member {}", mbr);
           if (initiateRemoval) {
+            logger.info("Requesting removal of suspect member {}", mbr);
             services.getJoinLeave().remove(mbr, reason);
           }
+          // make sure he's still suspected
+          memberSuspected(localAddress, mbr, reason);
           failed = true;
         } else {
           logger.info(