You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/04/19 23:07:42 UTC

[geode] branch feature/GEODE-6583 updated: reinstated use of next-neighbors by default. added check for insufficient history to phi detectors & use timestamps if that's the case.

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6583
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/feature/GEODE-6583 by this push:
     new 77db462  reinstated use of next-neighbors by default. added check for insufficient history to phi detectors & use timestamps if that's the case.
77db462 is described below

commit 77db4622661025099d209ca51b0621349d0af7ff
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Apr 19 16:06:06 2019 -0700

    reinstated use of next-neighbors by default.
    added check for insufficient history to phi detectors & use timestamps
    if that's the case.
---
 .../gms/fd/GMSHealthMonitorJUnitTest.java          | 22 +----
 .../gms/fd/PhiAccrualFailureDetectorTest.java      | 37 +++++++++
 .../membership/gms/fd/GMSHealthMonitor.java        | 94 ++++++++++++----------
 .../gms/fd/PhiAccrualFailureDetector.java          | 13 ++-
 4 files changed, 100 insertions(+), 66 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index df7acd5..a907511 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -214,27 +214,6 @@ public class GMSHealthMonitorJUnitTest {
     assertEquals(mockMembers.get(myAddressIndex + 1), gmsHealthMonitor.getNextNeighbor());
   }
 
-  @Test
-  public void testHMNextNeighborAfterTimeout() throws Exception {
-    System.out.println("testHMNextNeighborAfterTimeout starting");
-
-    installAView();
-    InternalDistributedMember initialNeighbor = mockMembers.get(myAddressIndex + 1);
-
-    await("wait for new neighbor")
-        .until(() -> gmsHealthMonitor.getNextNeighbor() != initialNeighbor);
-    InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
-
-    // neighbor should change. In order to not be a flaky test we don't demand
-    // that it be myAddressIndex+2 but just require that the neighbor being
-    // monitored has changed
-    System.out.println("testHMNextNeighborAfterTimeout ending");
-    Assert.assertNotNull(gmsHealthMonitor.getView());
-    Assert.assertNotEquals("neighbor to not be " + neighbor + "; my ID is "
-        + mockMembers.get(myAddressIndex) + ";  view=" + gmsHealthMonitor.getView(),
-        initialNeighbor, neighbor);
-  }
-
   /**
    * it checks neighbor before member-timeout, it should be same
    */
@@ -303,6 +282,7 @@ public class GMSHealthMonitorJUnitTest {
     long startTime = System.currentTimeMillis();
     installAView();
     InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
+    assertFalse(neighbor.equals(joinLeave.getMemberID()));
 
     await().until(() -> gmsHealthMonitor.isSuspectMember(neighbor));
     long endTime = System.currentTimeMillis();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
index a0c5061..9fcfd51 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
@@ -19,6 +19,8 @@ package org.apache.geode.distributed.internal.membership.gms.fd;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import java.util.Random;
+
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -74,4 +76,39 @@ public class PhiAccrualFailureDetectorTest {
       assertTrue(failureDetector.isAvailable(timestampMillis));
     }
   }
+
+  @Test
+  public void isAvailableTest() throws Exception {
+    double threshold = 10;
+    int historySize = 200;
+    double stddev = 100;
+    long now = System.currentTimeMillis();
+    long[] intervals =
+        new long[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -14, -15, -16, -17, -18, -19, -20};
+    int heartbeatInterval = 5000;
+    int acceptableHeartbeatPauseMillis = heartbeatInterval / 2;
+    Random random = new Random();
+    PhiAccrualFailureDetector detector =
+        new PhiAccrualFailureDetector(threshold, historySize, stddev,
+            acceptableHeartbeatPauseMillis, now);
+    long heartbeatTime = 0;
+    for (long l = 1; l <= historySize; l++) {
+      long deviation = getDeviation(heartbeatInterval, random);
+      detector.heartbeat(now + (l * heartbeatInterval) + deviation);
+    }
+    for (long l : intervals) {
+      if (l > 0) {
+        long deviation = getDeviation(heartbeatInterval, random);
+        detector.heartbeat(now + (heartbeatInterval * (l + historySize)) + deviation);
+      }
+      l = Math.abs(l);
+      heartbeatTime = now + (heartbeatInterval * (l + historySize));
+      System.out.println("heartbeat " + l + " phi= " + detector.phi(heartbeatTime));
+    }
+    assertFalse("phi=" + detector.phi(heartbeatTime), detector.isAvailable(heartbeatTime));
+  }
+
+  public int getDeviation(int heartbeatInterval, Random random) {
+    return 0; // random.nextInt(heartbeatInterval / 4);
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 304e9a8..ce17c27 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -248,57 +249,63 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     @Override
     public void run() {
 
-      if (GMSHealthMonitor.this.isStopping) {
-        return;
-      }
+      try {
+        if (GMSHealthMonitor.this.isStopping) {
+          return;
+        }
 
-      long oldTimeStamp = currentTimeStamp;
-      currentTimeStamp = System.currentTimeMillis();
+        long oldTimeStamp = currentTimeStamp;
+        currentTimeStamp = System.currentTimeMillis();
 
-      NetView myView = GMSHealthMonitor.this.currentView;
-      if (myView == null) {
-        return;
-      }
+        NetView myView = GMSHealthMonitor.this.currentView;
+        if (myView == null) {
+          return;
+        }
 
+        if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
+          // delay in running this task - don't suspect anyone for a while
+          for (InternalDistributedMember member : myView.getMembers()) {
+            PhiAccrualFailureDetector detector = memberDetectors.get(member);
+            if (detector != null) {
+              detector.heartbeat(currentTimeStamp);
+            }
+          }
+          return;
+        }
 
-      if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
-        // delay in running this task - don't suspect anyone for a while
-        for (InternalDistributedMember member : myView.getMembers()) {
-          PhiAccrualFailureDetector detector = memberDetectors.get(member);
-          if (detector != null) {
-            detector.heartbeat(currentTimeStamp);
+        if (!Boolean.getBoolean("gemfire.failure-detection-watch-all-nodes")) {
+          InternalDistributedMember neighbour = nextNeighbor;
+          if (neighbour != null) {
+            PhiAccrualFailureDetector detector =
+                getOrCreatePhiAccrualFailureDetector(neighbour);
+            verifyHeartbeat(neighbour, detector);
+          }
+        } else {
+          for (Map.Entry<InternalDistributedMember, PhiAccrualFailureDetector> entry : memberDetectors
+              .entrySet()) {
+            InternalDistributedMember member = entry.getKey();
+            if (isSuspectMember(member)) {
+              continue;
+            }
+            PhiAccrualFailureDetector detector = entry.getValue();
+            verifyHeartbeat(member, detector);
           }
         }
-        return;
+
+      } catch (RuntimeException | Error ex) {
+        logger.info("Health monitor encountered an exception", ex);
       }
+    }
 
-      // if (myView.getCoordinator().equals(localAddress)) {
-      // for phi accrual we need to periodically check all members
-      for (InternalDistributedMember member : myView.getMembers()) {
-        PhiAccrualFailureDetector detector =
-            getOrCreatePhiAccrualFailureDetector(member);
-        if (!detector.isAvailable()) {
-          checkMember(member);
-        }
+    private void verifyHeartbeat(InternalDistributedMember member,
+        PhiAccrualFailureDetector detector) {
+      if (!detector.isAvailable(currentTimeStamp)) {
+        logger.warn("ERNIE & BRUCE: initiating checkMember from the health monitor for "
+            + "{} with phi={}, last heartbeat={} and recorded heartbeats={}",
+            member, detector.phi(currentTimeStamp), new Date(detector.getLastTimestampMillis()),
+            detector.heartbeatCount());
+        checkMember(member);
       }
-      return;
-      // }
-
-      // InternalDistributedMember neighbour = nextNeighbor;
-      //
-      // long currentTime = System.currentTimeMillis();
-      // // this is the start of interval to record member activity
-      // GMSHealthMonitor.this.currentTimeStamp = currentTime;
-      //
-      // if (neighbour != null) {
-      // PhiAccrualFailureDetector nextNeighborDetector =
-      // getOrCreatePhiAccrualFailureDetector(neighbour, currentTime);
-      // if (!nextNeighborDetector.isAvailable()) {
-      // logger.debug("Checking member {} ", neighbour);
-      // // now do check request for this member;
-      // checkMember(neighbour);
-      // }
-      // }
     }
 
   }
@@ -440,8 +447,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 200);
       final int minStdDev = Integer.getInteger("geode.phiAccrualMinimumStandardDeviation", 100);
       detector = new PhiAccrualFailureDetector(
-          threshold, sampleSize, minStdDev, memberTimeout, memberTimeout);
-      detector.heartbeat(currentTimeStamp);
+          threshold, sampleSize, minStdDev, memberTimeout / 2, currentTimeStamp);
       memberDetectors.put(member, detector);
     }
     return detector;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
index 6d0233a..6d2d78d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
@@ -125,6 +125,9 @@ public class PhiAccrualFailureDetector {
     heartbeatHistory = new HeartbeatHistory(maxSampleSize);
     heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis)
         .add(firstHeartbeatEstimateMillis + stdDeviationMillis);
+
+    // Bruce: record the estimate as the last timestamp received
+    lastTimestampMillis.set(firstHeartbeatEstimateMillis);
   }
 
   private double ensureValidStdDeviation(double stdDeviationMillis) {
@@ -156,7 +159,11 @@ public class PhiAccrualFailureDetector {
 
   public boolean isAvailable(long timestampMillis) {
     double currentPhi = phi(timestampMillis);
-    return Double.isNaN(currentPhi) || currentPhi < threshold;
+    if (Double.isNaN(currentPhi)) {
+      // There isn't enough history to compute a valid Phi so we use a timestamp check
+      return (getLastTimestampMillis() >= timestampMillis - acceptableHeartbeatPauseMillis);
+    }
+    return currentPhi < threshold;
   }
 
   public boolean isAvailable() {
@@ -185,6 +192,10 @@ public class PhiAccrualFailureDetector {
     return lastTimestampMillis.get();
   }
 
+  public double getThreshold() {
+    return threshold;
+  }
+
   public void heartbeat() {
     heartbeat(System.currentTimeMillis());
   }