You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/05/08 15:32:29 UTC

[geode] branch feature/GEODE-6583 updated: restoring isAvailable check in heartbeat recording

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6583
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6583 by this push:
     new 7e701e4  restoring isAvailable check in heartbeat recording
7e701e4 is described below

commit 7e701e46ab5a76229e9eb2e73a534621d8e2afeb
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed May 8 08:31:36 2019 -0700

    restoring isAvailable check in heartbeat recording
    
    tests still fail with detectors being too slow to initiate suspicion.
    concCoordSelection.conf, for instance.
---
 .../apache/geode/distributed/LocatorDUnitTest.java |  6 +-
 .../gms/fd/GMSHealthMonitorJUnitTest.java          |  2 +-
 .../gms/fd/PhiAccrualFailureDetectorTest.java      | 25 +++----
 .../membership/gms/fd/GMSHealthMonitor.java        | 84 +++++++++++++---------
 .../gms/fd/PhiAccrualFailureDetector.java          | 47 ++++++------
 .../org/apache/geode/internal/tcp/Connection.java  |  8 +--
 6 files changed, 89 insertions(+), 83 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
index e07f572..64646ac 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
@@ -92,7 +92,6 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.logging.LocalLogWriter;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.security.SecurableCommunicationChannel;
-import org.apache.geode.internal.tcp.Connection;
 import org.apache.geode.test.dunit.AsyncInvocation;
 import org.apache.geode.test.dunit.DUnitBlackboard;
 import org.apache.geode.test.dunit.DistributedTestUtils;
@@ -694,8 +693,9 @@ public class LocatorDUnitTest implements java.io.Serializable {
       // quorumLost should be invoked if we get a ForcedDisconnect in this situation
       assertThat(listener.quorumLostInvoked).describedAs("expected quorumLost to be invoked")
           .isTrue();
-      assertThat(listener.suspectReasons.contains(Connection.INITIATING_SUSPECT_PROCESSING))
-          .describedAs("expected suspect processing initiated by TCPConduit").isTrue();
+      System.out.println("suspectReasons=" + listener.suspectReasons);
+      assertThat(listener.suspectReasons.contains("Member isn't responding to heartbeat requests"))
+          .describedAs("expected suspect processing initiated by health monitor").isTrue();
     } finally {
       if (locator != null) {
         locator.stop();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index a907511..fbc9c5c 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -760,7 +760,7 @@ public class GMSHealthMonitorJUnitTest {
     HeartbeatMessage hb = new HeartbeatMessage(-1);
     hb.setSender(mockMembers.get(0));
     gmsHealthMonitor.processMessage(hb);
-    assertEquals(2, gmsHealthMonitor.memberDetectors.get(hb.getSender()).heartbeatCount());
+    assertEquals(0, gmsHealthMonitor.memberDetectors.get(hb.getSender()).heartbeatsRecorded());
 
     // a sick member will not take action on a Suspect message from another member
     SuspectMembersMessage smm = mock(SuspectMembersMessage.class);
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
index f192e76..c5358eb 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
@@ -19,8 +19,6 @@ package org.apache.geode.distributed.internal.membership.gms.fd;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import java.util.Random;
-
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -37,6 +35,7 @@ public class PhiAccrualFailureDetectorTest {
 
       if (i > 290) {
         double phi = failureDetector.phi(timestampMillis);
+        System.out.println("for interval " + i + ", phi=" + phi);
         if (i == 291) {
           assertTrue(1 < phi && phi < 3);
           assertTrue(failureDetector.isAvailable(timestampMillis));
@@ -72,7 +71,9 @@ public class PhiAccrualFailureDetectorTest {
         }
       }
       failureDetector.heartbeat(timestampMillis);
-      assertTrue(failureDetector.phi(timestampMillis) < 0.1);
+      double phi = failureDetector.phi(timestampMillis);
+      System.out.println("for interval " + i + ", phi=" + phi);
+      assertTrue("for " + i + ", expected phi<0.1 but was " + phi, phi < 0.1);
       assertTrue(failureDetector.isAvailable(timestampMillis));
     }
   }
@@ -81,30 +82,26 @@ public class PhiAccrualFailureDetectorTest {
   public void isAvailableTest() throws Exception {
     double threshold = 10;
     int historySize = 200;
-    double stddev = 100;
+    double stddev = 50;
     long now = System.currentTimeMillis();
     long[] intervals =
-        new long[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -14, -15, -16, -17, -18, -19, -20};
+        new long[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -14, -15, -16, -17, -18, -19, -20,
+            21, 22, 23, 24};
     int heartbeatInterval = 5000;
-    int acceptableHeartbeatPauseMillis = heartbeatInterval / 2;
-    Random random = new Random();
+    int acceptableHeartbeatPauseMillis = 0; // heartbeatInterval / 4;
     PhiAccrualFailureDetector detector =
         new PhiAccrualFailureDetector(threshold, historySize, stddev,
-            acceptableHeartbeatPauseMillis, now);
+            acceptableHeartbeatPauseMillis, heartbeatInterval);
     long heartbeatTime = 0;
     for (long l : intervals) {
       if (l > 0) {
-        long deviation = getDeviation(heartbeatInterval, random);
-        detector.heartbeat(now + (heartbeatInterval * l) + deviation);
+        detector.heartbeat(now + (heartbeatInterval * l));
       }
       l = Math.abs(l);
       heartbeatTime = now + (heartbeatInterval * l);
-      System.out.println("heartbeat " + l + " phi= " + detector.phi(heartbeatTime));
+      System.out.printf("heartbeat %d phi= %s%n", l, detector.phi(heartbeatTime));
     }
     assertFalse("phi=" + detector.phi(heartbeatTime), detector.isAvailable(heartbeatTime));
   }
 
-  public int getDeviation(int heartbeatInterval, Random random) {
-    return random.nextInt(heartbeatInterval / 4);
-  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 93c28c2..1da7978 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -275,23 +275,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           return;
         }
 
-        if (!Boolean.getBoolean("gemfire.failure-detection-watch-all-nodes")) {
-          InternalDistributedMember neighbour = nextNeighbor;
-          if (neighbour != null) {
-            PhiAccrualFailureDetector detector =
-                getOrCreatePhiAccrualFailureDetector(neighbour);
-            verifyHeartbeat(neighbour, detector);
-          }
-        } else {
-          for (Map.Entry<InternalDistributedMember, PhiAccrualFailureDetector> entry : memberDetectors
-              .entrySet()) {
-            InternalDistributedMember member = entry.getKey();
-            if (isSuspectMember(member)) {
-              continue;
-            }
-            PhiAccrualFailureDetector detector = entry.getValue();
-            verifyHeartbeat(member, detector);
-          }
+        InternalDistributedMember neighbour = nextNeighbor;
+        if (neighbour != null) {
+          PhiAccrualFailureDetector detector =
+              getOrCreatePhiAccrualFailureDetector(neighbour);
+          verifyHeartbeat(neighbour, detector);
         }
 
       } catch (RuntimeException | Error ex) {
@@ -303,10 +291,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
         PhiAccrualFailureDetector detector) {
       if (!detector.isAvailable(currentTimeStamp)) {
         logger.warn("ERNIE & BRUCE: initiating checkMember from the health monitor for "
-            + "{} with phi={}, last heartbeat={} and recorded heartbeats={}",
-            member, detector.phi(currentTimeStamp), new Date(detector.getLastTimestampMillis()),
-            detector.heartbeatCount());
+            + "{} with phi={}, last heartbeat={} heartbeats recorded={}",
+            member, detector.phi(currentTimeStamp), detector.getLastTimestampMillis() == null
+                ? "none" : new Date(detector.getLastTimestampMillis()),
+            detector.heartbeatsRecorded());
         checkMember(member);
+      } else {
+        logger.info("ERNIE & BRUCE: verifyHeartbeat found {} is available with phi={}"
+            + " last heartbeat={} heartbeats recorded={}",
+            member, detector.phi(currentTimeStamp), detector.getLastTimestampMillis() == null
+                ? "none" : new Date(detector.getLastTimestampMillis()),
+            detector.heartbeatsRecorded());
+        if (detector.getLastTimestampMillis() != null &&
+            detector.getLastTimestampMillis() + (2 * memberTimeout) < currentTimeStamp) {
+          logger.info("dump of phi detector's history: {}", detector.getIntervalHistory());
+        }
       }
     }
 
@@ -427,8 +426,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * Record member activity at a specified time
    */
   private void contactedBy(InternalDistributedMember sender, long timeStamp) {
-    PhiAccrualFailureDetector detector = getOrCreatePhiAccrualFailureDetector(sender);
-    detector.heartbeat(timeStamp);
+    PhiAccrualFailureDetector detector = memberDetectors.get(sender);
+    if (detector != null) {
+      detector.heartbeat(timeStamp);
+      List<Long> intervalHistory = detector.getIntervalHistory();
+      long lastInterval = intervalHistory.get(intervalHistory.size() - 1);
+      if (lastInterval > 7000) {
+        logger.warn(
+            "ERNIE & BRUCE: recording a large heartbeat interval of {} for {} with {} heartbeats",
+            lastInterval, sender, intervalHistory.size() - 2);
+      }
+    }
     if (suspectedMemberIds.containsKey(sender)) {
       memberUnsuspected(sender);
       setNextNeighbor(currentView, null);
@@ -443,13 +451,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     if (detector == null) {
       // it's okay to not synchronize here - if we happen to create another detector
       // for this member in another thread it will be pretty equivalent to the one created here
-      logger.info("Creating new failure detector for {}", member);
-      // TODO consider giving different kinds of members different thresholds
-      final int threshold = Integer.getInteger("geode.phiAccrualThreshold", 10);
-      final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 15);
-      final int minStdDev = Integer.getInteger("geode.phiAccrualMinimumStandardDeviation", 100);
+      final int threshold = Integer.getInteger("geode.phiAccrualThreshold", 5);
+      final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 200);
+      final long minStdDev = Long.getLong("geode.phiAccrualMinimumStandardDeviation",
+          memberTimeout / 10);
+      // have tried acceptableHeartbeatPauseMillis=0, memberTimeout/2 and /4
       detector = new PhiAccrualFailureDetector(
-          threshold, sampleSize, minStdDev, memberTimeout / 2, currentTimeStamp);
+          threshold, sampleSize, minStdDev, memberTimeout / 4, memberTimeout);
+      logger.info(
+          "ERNIE & BRUCE: created a new failure detector for {} with lastTimestamp={} and recorded heartbeats={}",
+          member, detector.getLastTimestampMillis(), detector.heartbeatsRecorded());
       memberDetectors.put(member, detector);
     }
     return detector;
@@ -691,6 +702,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     checkExecutor = LoggingExecutors.newCachedThreadPool("Geode Failure Detection thread ", true);
     Monitor m = this.new Monitor(memberTimeout);
     monitorInterval = memberTimeout / LOGICAL_INTERVAL;
+    currentTimeStamp = System.currentTimeMillis(); // Monitor expects this to be up-to-date when it
+                                                   // starts
     monitorFuture =
         scheduler.scheduleAtFixedRate(m, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
     serverSocketExecutor =
@@ -850,12 +863,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     synchronized (suspectRequestsInView) {
       suspectRequestsInView.clear();
     }
-    long time = System.currentTimeMillis();
-    for (InternalDistributedMember member : newView.getMembers()) {
-      if (!member.equals(this.localAddress)) {
-        getOrCreatePhiAccrualFailureDetector(member);
-      }
-    }
+    // for (InternalDistributedMember member : newView.getMembers()) {
+    // if (!member.equals(this.localAddress)) {
+    // getOrCreatePhiAccrualFailureDetector(member);
+    // }
+    // }
     for (Iterator<InternalDistributedMember> it = memberDetectors.keySet().iterator(); it
         .hasNext();) {
       if (!newView.contains(it.next())) {
@@ -1174,6 +1186,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
     }
     // we got heartbeat lets update timestamp
+    getOrCreatePhiAccrualFailureDetector(m.getSender());
     contactedBy(m.getSender(), System.currentTimeMillis());
   }
 
@@ -1379,7 +1392,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
 
       if (!pinged && !isStopping) {
         PhiAccrualFailureDetector detector = memberDetectors.get(mbr);
-        if (detector == null || detector.getLastTimestampMillis() < startTime) {
+        if (detector == null || detector.getLastTimestampMillis() == null
+            || detector.getLastTimestampMillis() < startTime) {
           logger.info("Availability check failed for member {}", mbr);
           // if the final check fails & this VM is the coordinator we don't need to do another final
           // check
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
index 732d58c..affd09e 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
@@ -17,6 +17,7 @@
 package org.apache.geode.distributed.internal.membership.gms.fd;
 
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -61,6 +62,7 @@ public class PhiAccrualFailureDetector {
 
   private final HeartbeatHistory heartbeatHistory;
   private final AtomicReference<Long> lastTimestampMillis = new AtomicReference<Long>();
+  private final AtomicLong heartbeatsRecorded = new AtomicLong();
 
   /**
    * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick
@@ -71,20 +73,17 @@ public class PhiAccrualFailureDetector {
    *
    * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
    *        inter-arrival times.
-   *
    * @param minStdDeviationMillis Minimum standard deviation to use for the normal distribution used
    *        when calculating phi.
    *        Too low standard deviation might result in too much sensitivity for sudden, but normal,
    *        deviations
    *        in heartbeat inter arrival times.
-   *
    * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially
    *        lost/delayed
    *        heartbeats that will be accepted before considering it to be an anomaly.
    *        This margin is important to be able to survive sudden, occasional, pauses in heartbeat
    *        arrivals, due to for example garbage collect or network drop.
-   *
-   * @param firstHeartbeatEstimateMillis Bootstrap the stats with heartbeats that corresponds to
+   * @param firstHeartbeatIntervalMillis Bootstrap the stats with heartbeats that corresponds to
    *        to this duration, with a with rather high standard deviation (since environment is
    *        unknown
    *        in the beginning)
@@ -93,11 +92,11 @@ public class PhiAccrualFailureDetector {
    *        Note for Geode use: this class was originally called PhiAccuralFailureDetector and is
    *        from
    *        https://github.com/komamitsu/phi-accural-failure-detector
-   *        </p>
    */
   protected PhiAccrualFailureDetector(double threshold, int maxSampleSize,
       double minStdDeviationMillis,
-      long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) {
+      long acceptableHeartbeatPauseMillis,
+      long firstHeartbeatIntervalMillis) {
     if (threshold <= 0) {
       throw new IllegalArgumentException("Threshold must be > 0: " + threshold);
     }
@@ -112,9 +111,9 @@ public class PhiAccrualFailureDetector {
       throw new IllegalArgumentException(
           "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);
     }
-    if (firstHeartbeatEstimateMillis <= 0) {
+    if (firstHeartbeatIntervalMillis <= 0) {
       throw new IllegalArgumentException(
-          "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis);
+          "First heartbeat value must be > 0: " + firstHeartbeatIntervalMillis);
     }
 
     this.threshold = threshold;
@@ -122,16 +121,10 @@ public class PhiAccrualFailureDetector {
     this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
 
     heartbeatHistory = new HeartbeatHistory(maxSampleSize);
-    for (int i = 0; i < maxSampleSize; i++) {
-      heartbeatHistory.add(acceptableHeartbeatPauseMillis);
-    }
-
-    // long stdDeviationMillis = acceptableHeartbeatPauseMillis / 4;
-    // heartbeatHistory.add(acceptableHeartbeatPauseMillis - stdDeviationMillis)
-    // .add(acceptableHeartbeatPauseMillis + stdDeviationMillis);
+    long stdDeviationMillis = firstHeartbeatIntervalMillis / 4;
+    heartbeatHistory.add(firstHeartbeatIntervalMillis - stdDeviationMillis)
+        .add(firstHeartbeatIntervalMillis + stdDeviationMillis);
 
-    // Bruce: record the estimate as the last timestamp received
-    lastTimestampMillis.set(firstHeartbeatEstimateMillis);
   }
 
   private double ensureValidStdDeviation(double stdDeviationMillis) {
@@ -174,25 +167,26 @@ public class PhiAccrualFailureDetector {
     return isAvailable(System.currentTimeMillis());
   }
 
-  public int heartbeatCount() {
-    return this.heartbeatHistory.size();
+  public long heartbeatsRecorded() {
+    return heartbeatsRecorded.get();
   }
 
   public synchronized void heartbeat(long timestampMillis) {
     Long lastTimestampMillis = this.lastTimestampMillis.getAndSet(timestampMillis);
     /** bruce s.: for Apache Geode, don't record duplicate heartbeats */
-    if (lastTimestampMillis != null && lastTimestampMillis == timestampMillis) {
+    if (lastTimestampMillis != null && lastTimestampMillis >= timestampMillis) {
       return;
     }
     if (lastTimestampMillis != null) {
       long interval = timestampMillis - lastTimestampMillis;
       if (isAvailable(timestampMillis)) {
         heartbeatHistory.add(interval);
+        heartbeatsRecorded.incrementAndGet();
       }
     }
   }
 
-  public long getLastTimestampMillis() {
+  public Long getLastTimestampMillis() {
     return lastTimestampMillis.get();
   }
 
@@ -204,6 +198,10 @@ public class PhiAccrualFailureDetector {
     heartbeat(System.currentTimeMillis());
   }
 
+  public List<Long> getIntervalHistory() {
+    return heartbeatHistory.intervals;
+  }
+
   public static class Builder {
     private double threshold = 16.0;
     private int maxSampleSize = 200;
@@ -260,7 +258,8 @@ public class PhiAccrualFailureDetector {
     }
 
     public double variance() {
-      return ((double) squaredIntervalSum.get() / intervals.size()) - (mean() * mean());
+      double mean = mean();
+      return ((double) squaredIntervalSum.get() / intervals.size()) - (mean * mean);
     }
 
     public double stdDeviation() {
@@ -279,10 +278,6 @@ public class PhiAccrualFailureDetector {
       return this;
     }
 
-    private int size() {
-      return intervals.size();
-    }
-
     private long pow2(long x) {
       return x * x;
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 2cda8b6..3f32624 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -121,10 +121,10 @@ public class Connection implements Runnable {
   @MakeNotStatic
   private static final AtomicLong idCounter = new AtomicLong(1);
 
-  /** string used as the reason for initiating suspect processing */
-  public static final String INITIATING_SUSPECT_PROCESSING =
-      "member unexpectedly shut down shared, unordered connection";
-
+  // /** string used as the reason for initiating suspect processing */
+  // public static final String INITIATING_SUSPECT_PROCESSING =
+  // "member unexpectedly shut down shared, unordered connection";
+  //
   /** the table holding this connection */
   private final ConnectionTable owner;