You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/10/04 18:03:41 UTC

[geode] 01/01: GEODE-3780 suspected member is never watched again after passing final check

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-3780
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 977aaf2fcc5c0f74a38742aa51e0feeb5296472d
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Oct 4 10:19:49 2019 -0700

    GEODE-3780 suspected member is never watched again after passing final check
    
    This restores our original behavior (pre-1.0) behavior of performing a
    final check on a member if UDP communications with that member fail.
    
    We now also send exonoration messages to all other members if a suspect is
    cleared.  We need to do that because another member may have sent a
    Suspect message that was ignored because the suspect was already
    undergoing a final check.
    
    I also noticed that our tcp/ip final check loop was performing more than
    one check in many cases because the nanosecond clock has a coarse
    granularity.  A socket so-timeout based on the millisecond clock was
    timing out but the nanosecond clock didn't line up with that timeout and
    caused the "for" loop to make another attempt.  I changed that loop to
    convert the nanosecond clock value to milliseconds.
---
 ...omcatSessionBackwardsCompatibilityTestBase.java |   8 +-
 .../membership/gms/fd/GMSHealthMonitor.java        | 148 +++++++++++++--------
 .../membership/gms/messenger/JGroupsMessenger.java |  16 ++-
 .../geode/test/dunit/internal/ProcessManager.java  |  13 +-
 4 files changed, 106 insertions(+), 79 deletions(-)

diff --git a/geode-assembly/src/upgradeTest/java/org/apache/geode/session/tests/TomcatSessionBackwardsCompatibilityTestBase.java b/geode-assembly/src/upgradeTest/java/org/apache/geode/session/tests/TomcatSessionBackwardsCompatibilityTestBase.java
index f4b8272..e629026 100644
--- a/geode-assembly/src/upgradeTest/java/org/apache/geode/session/tests/TomcatSessionBackwardsCompatibilityTestBase.java
+++ b/geode-assembly/src/upgradeTest/java/org/apache/geode/session/tests/TomcatSessionBackwardsCompatibilityTestBase.java
@@ -88,13 +88,7 @@ public abstract class TomcatSessionBackwardsCompatibilityTestBase {
 
   protected TomcatSessionBackwardsCompatibilityTestBase(String version) {
     VersionManager versionManager = VersionManager.getInstance();
-    String installLocation = null;
-    try {
-      installLocation = versionManager.getInstall(version);
-    } finally {
-      System.out.println(
-          "BRUCE: for version " + version + " the installation directory is " + installLocation);
-    }
+    String installLocation = installLocation = versionManager.getInstall(version);
     oldBuild = new File(installLocation);
     oldModules = new File(installLocation + "/tools/Modules/");
   }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 9e90316..41a82b6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -244,56 +244,63 @@ public class GMSHealthMonitor implements HealthMonitor {
     @Override
     public void run() {
 
-      if (GMSHealthMonitor.this.isStopping) {
-        return;
+      GMSMember neighbor = nextNeighbor;
+      if (logger.isDebugEnabled()) {
+        logger.debug("cluster health monitor invoked with {}", neighbor);
       }
+      try {
+        if (GMSHealthMonitor.this.isStopping) {
+          return;
+        }
 
-      GMSMember neighbour = nextNeighbor;
-
-      long currentTime = System.currentTimeMillis();
-      // this is the start of interval to record member activity
-      GMSHealthMonitor.this.currentTimeStamp = currentTime;
-
-
-      long oldTimeStamp = currentTimeStamp;
-      currentTimeStamp = System.currentTimeMillis();
-
-      GMSMembershipView myView = GMSHealthMonitor.this.currentView;
-      if (myView == null) {
-        return;
-      }
+        long currentTime = System.currentTimeMillis();
+        // this is the start of interval to record member activity
+        GMSHealthMonitor.this.currentTimeStamp = currentTime;
 
-      if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
-        // delay in running this task - don't suspect anyone for a while
-        logger.info(
-            "Failure detector has noticed a JVM pause and is giving all members a heartbeat in view {}",
-            currentView);
-        for (GMSMember member : myView.getMembers()) {
-          contactedBy(member);
-        }
-        return;
-      }
+        long oldTimeStamp = currentTimeStamp;
+        currentTimeStamp = System.currentTimeMillis();
 
-      if (neighbour != null) {
-        TimeStamp nextNeighborTS;
-        synchronized (GMSHealthMonitor.this) {
-          nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
+        GMSMembershipView myView = GMSHealthMonitor.this.currentView;
+        if (myView == null) {
+          return;
         }
 
-        if (nextNeighborTS == null) {
-          logger.debug("timestamp for {} was found null - setting current time as timestamp",
-              neighbour);
-          TimeStamp customTS = new TimeStamp(currentTime);
-          memberTimeStamps.put(neighbour, customTS);
+        if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
+          // delay in running this task - don't suspect anyone for a while
+          logger.info(
+              "Failure detector has noticed a JVM pause and is giving all members a heartbeat in view {}",
+              currentView);
+          for (GMSMember member : myView.getMembers()) {
+            contactedBy(member);
+          }
           return;
         }
 
-        long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
-        long lastTS = currentTime - nextNeighborTS.getTime();
-        if (lastTS + interval >= memberTimeoutInMillis) {
-          logger.debug("Checking member {} ", neighbour);
-          // now do check request for this member;
-          checkMember(neighbour);
+        if (neighbor != null) {
+          TimeStamp nextNeighborTS;
+          synchronized (GMSHealthMonitor.this) {
+            nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbor);
+          }
+
+          if (nextNeighborTS == null) {
+            logger.debug("timestamp for {} was found null - setting current time as timestamp",
+                neighbor);
+            TimeStamp customTS = new TimeStamp(currentTime);
+            memberTimeStamps.put(neighbor, customTS);
+            return;
+          }
+
+          long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
+          long lastTS = currentTime - nextNeighborTS.getTime();
+          if (lastTS + interval >= memberTimeoutInMillis) {
+            logger.debug("Checking member {} ", neighbor);
+            // now do check request for this member;
+            checkMember(neighbor);
+          }
+        }
+      } finally {
+        if (logger.isDebugEnabled()) {
+          logger.debug("cluster health monitor pausing");
         }
       }
     }
@@ -545,9 +552,10 @@ public class GMSHealthMonitor implements HealthMonitor {
       boolean retryIfConnectFails) {
     Socket clientSocket = null;
     // make sure we try to check on the member for the contracted memberTimeout period
-    // in case a timed socket.connect() returns immediately
-    long giveupTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(
-        services.getConfig().getMemberTimeout(), TimeUnit.MILLISECONDS);
+    // in case a timed socket.connect() returns immediately. Use milliseconds to be in
+    // sync with the socket timeout parameter unit of measure
+    long giveupTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS) +
+        services.getConfig().getMemberTimeout();
     boolean passed = false;
     int iteration = 0;
     do {
@@ -587,7 +595,7 @@ public class GMSHealthMonitor implements HealthMonitor {
         }
       }
     } while (retryIfConnectFails && !passed && !this.isShutdown()
-        && System.nanoTime() < giveupTime);
+        && TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.MILLISECONDS) < giveupTime);
     return passed;
   }
 
@@ -651,6 +659,9 @@ public class GMSHealthMonitor implements HealthMonitor {
   @Override
   public boolean checkIfAvailable(GMSMember mbr, String reason,
       boolean initiateRemoval) {
+    if (membersInFinalCheck.contains(mbr)) {
+      return true; // status unknown for now but someone is checking
+    }
     return inlineCheckIfAvailable(localAddress, currentView, initiateRemoval,
         mbr, reason);
   }
@@ -898,7 +909,7 @@ public class GMSHealthMonitor implements HealthMonitor {
     if (nextNeighbor != null && nextNeighbor.equals(localAddress)) {
       if (logger.isDebugEnabled()) {
         logger.debug("Health monitor is unable to find a neighbor to watch.  "
-            + "Current suspects are {}", suspectedMemberIds);
+            + "Current suspects are {}", suspectedMemberIds.keySet());
       }
       nextNeighbor = null;
     }
@@ -1143,6 +1154,8 @@ public class GMSHealthMonitor implements HealthMonitor {
       return;
     }
 
+    logSuspectRequests(incomingRequest, incomingRequest.getSender());
+
     this.stats.incSuspectsReceived();
 
     GMSMembershipView cv = currentView;
@@ -1179,12 +1192,9 @@ public class GMSHealthMonitor implements HealthMonitor {
       }
     }
 
-    logger.debug(
-        "Processing suspect requests {}\nproposed view is currently {}\nwith coordinator {}",
-        suspectRequests, cv, cv.getCoordinator());
+    logger.debug("Processing {}", incomingRequest);
     if (cv.getCoordinator().equals(localAddress)) {
       // This process is the membership coordinator and should perform a final check
-      logSuspectRequests(incomingRequest, sender);
       checkIfAvailable(sender, suspectRequests, cv);
 
     } else {
@@ -1212,7 +1222,7 @@ public class GMSHealthMonitor implements HealthMonitor {
         logger.debug("Current leave requests are {}", membersLeaving);
         check.removeAll(membersLeaving);
       }
-      logger.trace(
+      logger.debug(
           "Proposed view with suspects & leaving members removed is {}\nwith coordinator {}\nmy address is {}",
           check,
           check.getCoordinator(), localAddress);
@@ -1220,7 +1230,6 @@ public class GMSHealthMonitor implements HealthMonitor {
       GMSMember coordinator = check.getCoordinator();
       if (coordinator != null && coordinator.equals(localAddress)) {
         // new coordinator
-        logSuspectRequests(incomingRequest, sender);
         checkIfAvailable(sender, membersToCheck, cv);
       }
     }
@@ -1296,8 +1305,18 @@ public class GMSHealthMonitor implements HealthMonitor {
     }
   }
 
+  /**
+   * Check to see if a member is available
+   *
+   * @param initiator member who initiated this check
+   * @param cv the view we're basing the check upon
+   * @param isFinalCheck whether the member should be kicked out if it fails the check
+   * @param mbr the member to check
+   * @param reason why we're doing this check
+   * @return true if the check passes
+   */
   protected boolean inlineCheckIfAvailable(final GMSMember initiator,
-      final GMSMembershipView cv, boolean forceRemovalIfCheckFails, final GMSMember mbr,
+      final GMSMembershipView cv, boolean isFinalCheck, final GMSMember mbr,
       final String reason) {
 
     if (services.getJoinLeave().isMemberLeaving(mbr)) {
@@ -1339,7 +1358,7 @@ public class GMSHealthMonitor implements HealthMonitor {
         doCheckMember(mbr, false);
         // now, while waiting for a heartbeat, try connecting to the suspect's failure detection
         // port
-        final boolean retryIfConnectFails = forceRemovalIfCheckFails;
+        final boolean retryIfConnectFails = isFinalCheck;
         pinged = doTCPCheckMember(mbr, port, retryIfConnectFails);
       }
 
@@ -1350,7 +1369,7 @@ public class GMSHealthMonitor implements HealthMonitor {
           logger.info("Availability check failed for member {}", mbr);
           // if the final check fails & this VM is the coordinator we don't need to do another final
           // check
-          if (forceRemovalIfCheckFails) {
+          if (isFinalCheck) {
             logger.info("Requesting removal of suspect member {}", mbr);
             services.getJoinLeave().remove(mbr, reason);
             // make sure it is still suspected
@@ -1379,18 +1398,29 @@ public class GMSHealthMonitor implements HealthMonitor {
           }
         } else {
           logger.info(
-              "Availability check failed but detected recent message traffic for suspect member "
+              "Availability check detected recent message traffic for suspect member "
                   + mbr + " at time " + new Date(ts.getTime()));
           failed = false;
         }
       }
 
       if (!failed) {
-        if (!isStopping && !initiator.equals(localAddress)
+        if (!isStopping
             && initiator.getVersionOrdinal() >= Version.GEODE_1_4_0.ordinal()) {
-          // let the sender know that it's okay to monitor this member again
+          // let others know that this member is no longer suspect
           FinalCheckPassedMessage message = new FinalCheckPassedMessage(initiator, mbr);
-          services.getMessenger().send(message);
+          List<GMSMember> members = cv.getMembers();
+          List<GMSMember> recipients = new ArrayList<>(members.size());
+          for (GMSMember member : members) {
+            if (!isSuspectMember(member) && !membersInFinalCheck.contains(member) &&
+                !member.equals(localAddress)) {
+              recipients.add(member);
+            }
+          }
+          if (recipients.size() > 0) {
+            message.setRecipients(recipients);
+            services.getMessenger().send(message);
+          }
         }
 
         logger.info("Availability check passed for suspect member " + mbr);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 61bc20d..be4ebe6 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -492,10 +492,6 @@ public class JGroupsMessenger implements Messenger {
                                                       // shutdown
       return;
     }
-    if (addressesWithIoExceptionsProcessed.contains(dest)) {
-      return;
-    }
-    addressesWithIoExceptionsProcessed.add(dest);
     GMSMembershipView v = this.view;
     JGAddress jgMbr = (JGAddress) dest;
     if (jgMbr != null && v != null) {
@@ -510,9 +506,15 @@ public class JGroupsMessenger implements Messenger {
         }
       }
       if (recipient != null) {
-        logger.warn("Unable to send message to " + recipient, e);
-        services.getHealthMonitor().suspect(recipient,
-            "Unable to send messages to this member via JGroups");
+        if (!addressesWithIoExceptionsProcessed.contains(dest)) {
+          logger.warn("Unable to send message to " + recipient, e);
+          addressesWithIoExceptionsProcessed.add(dest);
+        }
+        // If communications aren't working we need to resolve the issue quickly, so here
+        // we initiate a final check. Prior to becoming open-source we did a similar check
+        // using JGroups VERIFY_SUSPECT
+        services.getHealthMonitor().checkIfAvailable(recipient,
+            "Unable to send messages to this member via JGroups", true);
       }
     }
   }
diff --git a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/ProcessManager.java b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/ProcessManager.java
index 4529b15..d720c0f 100755
--- a/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/ProcessManager.java
+++ b/geode-dunit/src/main/java/org/apache/geode/test/dunit/internal/ProcessManager.java
@@ -16,6 +16,7 @@ package org.apache.geode.test.dunit.internal;
 
 import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_NETWORK_PARTITION_DETECTION;
 import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
+import static org.apache.geode.distributed.internal.DistributionConfig.GEMFIRE_PREFIX;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -288,11 +289,11 @@ class ProcessManager implements ChildVMLauncher {
       // detection is disabled, so we turn it off in the locator. Tests for network partition
       // detection should create a separate locator that has it enabled
       cmds.add(
-          "-D" + DistributionConfig.GEMFIRE_PREFIX + ENABLE_NETWORK_PARTITION_DETECTION + "=false");
+          "-D" + GEMFIRE_PREFIX + ENABLE_NETWORK_PARTITION_DETECTION + "=false");
       cmds.add(
-          "-D" + DistributionConfig.GEMFIRE_PREFIX + "allow_old_members_to_join_for_testing=true");
+          "-D" + GEMFIRE_PREFIX + "allow_old_members_to_join_for_testing=true");
     }
-    cmds.add("-D" + LOG_LEVEL + "=" + DUnitLauncher.logLevel);
+    cmds.add("-D" + GEMFIRE_PREFIX + LOG_LEVEL + "=" + DUnitLauncher.logLevel);
     if (DUnitLauncher.LOG4J != null) {
       cmds.add("-Dlog4j.configurationFile=" + DUnitLauncher.LOG4J);
     }
@@ -300,10 +301,10 @@ class ProcessManager implements ChildVMLauncher {
     cmds.add("-Xrunjdwp:transport=dt_socket,server=y,suspend=" + jdkSuspend + jdkDebug);
     cmds.add("-XX:+HeapDumpOnOutOfMemoryError");
     cmds.add("-Xmx512m");
-    cmds.add("-D" + DistributionConfig.GEMFIRE_PREFIX + "DEFAULT_MAX_OPLOG_SIZE=10");
-    cmds.add("-D" + DistributionConfig.GEMFIRE_PREFIX + "disallowMcastDefaults=true");
+    cmds.add("-D" + GEMFIRE_PREFIX + "DEFAULT_MAX_OPLOG_SIZE=10");
+    cmds.add("-D" + GEMFIRE_PREFIX + "disallowMcastDefaults=true");
     cmds.add("-D" + DistributionConfig.RESTRICT_MEMBERSHIP_PORT_RANGE + "=true");
-    cmds.add("-D" + DistributionConfig.GEMFIRE_PREFIX
+    cmds.add("-D" + GEMFIRE_PREFIX
         + ConfigurationProperties.VALIDATE_SERIALIZABLE_OBJECTS + "=true");
     cmds.add("-ea");
     cmds.add("-XX:MetaspaceSize=512m");