You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/04/19 23:07:42 UTC
[geode] branch feature/GEODE-6583 updated: reinstated use of
next-neighbors by default. added check for insufficient history to phi
detectors & use timestamps if that's the case.
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-6583
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6583 by this push:
new 77db462 reinstated use of next-neighbors by default. added check for insufficient history to phi detectors & use timestamps if that's the case.
77db462 is described below
commit 77db4622661025099d209ca51b0621349d0af7ff
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Fri Apr 19 16:06:06 2019 -0700
reinstated use of next-neighbors by default.
added check for insufficient history to phi detectors & use timestamps
if that's the case.
---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 22 +----
.../gms/fd/PhiAccrualFailureDetectorTest.java | 37 +++++++++
.../membership/gms/fd/GMSHealthMonitor.java | 94 ++++++++++++----------
.../gms/fd/PhiAccrualFailureDetector.java | 13 ++-
4 files changed, 100 insertions(+), 66 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index df7acd5..a907511 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -214,27 +214,6 @@ public class GMSHealthMonitorJUnitTest {
assertEquals(mockMembers.get(myAddressIndex + 1), gmsHealthMonitor.getNextNeighbor());
}
- @Test
- public void testHMNextNeighborAfterTimeout() throws Exception {
- System.out.println("testHMNextNeighborAfterTimeout starting");
-
- installAView();
- InternalDistributedMember initialNeighbor = mockMembers.get(myAddressIndex + 1);
-
- await("wait for new neighbor")
- .until(() -> gmsHealthMonitor.getNextNeighbor() != initialNeighbor);
- InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
-
- // neighbor should change. In order to not be a flaky test we don't demand
- // that it be myAddressIndex+2 but just require that the neighbor being
- // monitored has changed
- System.out.println("testHMNextNeighborAfterTimeout ending");
- Assert.assertNotNull(gmsHealthMonitor.getView());
- Assert.assertNotEquals("neighbor to not be " + neighbor + "; my ID is "
- + mockMembers.get(myAddressIndex) + "; view=" + gmsHealthMonitor.getView(),
- initialNeighbor, neighbor);
- }
-
/**
* it checks neighbor before member-timeout, it should be same
*/
@@ -303,6 +282,7 @@ public class GMSHealthMonitorJUnitTest {
long startTime = System.currentTimeMillis();
installAView();
InternalDistributedMember neighbor = gmsHealthMonitor.getNextNeighbor();
+ assertFalse(neighbor.equals(joinLeave.getMemberID()));
await().until(() -> gmsHealthMonitor.isSuspectMember(neighbor));
long endTime = System.currentTimeMillis();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
index a0c5061..9fcfd51 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
@@ -19,6 +19,8 @@ package org.apache.geode.distributed.internal.membership.gms.fd;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import java.util.Random;
+
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -74,4 +76,39 @@ public class PhiAccrualFailureDetectorTest {
assertTrue(failureDetector.isAvailable(timestampMillis));
}
}
+
+ @Test
+ public void isAvailableTest() throws Exception {
+ double threshold = 10;
+ int historySize = 200;
+ double stddev = 100;
+ long now = System.currentTimeMillis();
+ long[] intervals =
+ new long[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -14, -15, -16, -17, -18, -19, -20};
+ int heartbeatInterval = 5000;
+ int acceptableHeartbeatPauseMillis = heartbeatInterval / 2;
+ Random random = new Random();
+ PhiAccrualFailureDetector detector =
+ new PhiAccrualFailureDetector(threshold, historySize, stddev,
+ acceptableHeartbeatPauseMillis, now);
+ long heartbeatTime = 0;
+ for (long l = 1; l <= historySize; l++) {
+ long deviation = getDeviation(heartbeatInterval, random);
+ detector.heartbeat(now + (l * heartbeatInterval) + deviation);
+ }
+ for (long l : intervals) {
+ if (l > 0) {
+ long deviation = getDeviation(heartbeatInterval, random);
+ detector.heartbeat(now + (heartbeatInterval * (l + historySize)) + deviation);
+ }
+ l = Math.abs(l);
+ heartbeatTime = now + (heartbeatInterval * (l + historySize));
+ System.out.println("heartbeat " + l + " phi= " + detector.phi(heartbeatTime));
+ }
+ assertFalse("phi=" + detector.phi(heartbeatTime), detector.isAvailable(heartbeatTime));
+ }
+
+ public int getDeviation(int heartbeatInterval, Random random) {
+ return 0; // random.nextInt(heartbeatInterval / 4);
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 304e9a8..ce17c27 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@@ -248,57 +249,63 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
@Override
public void run() {
- if (GMSHealthMonitor.this.isStopping) {
- return;
- }
+ try {
+ if (GMSHealthMonitor.this.isStopping) {
+ return;
+ }
- long oldTimeStamp = currentTimeStamp;
- currentTimeStamp = System.currentTimeMillis();
+ long oldTimeStamp = currentTimeStamp;
+ currentTimeStamp = System.currentTimeMillis();
- NetView myView = GMSHealthMonitor.this.currentView;
- if (myView == null) {
- return;
- }
+ NetView myView = GMSHealthMonitor.this.currentView;
+ if (myView == null) {
+ return;
+ }
+ if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
+ // delay in running this task - don't suspect anyone for a while
+ for (InternalDistributedMember member : myView.getMembers()) {
+ PhiAccrualFailureDetector detector = memberDetectors.get(member);
+ if (detector != null) {
+ detector.heartbeat(currentTimeStamp);
+ }
+ }
+ return;
+ }
- if (currentTimeStamp - oldTimeStamp > monitorInterval + MONITOR_DELAY_THRESHOLD) {
- // delay in running this task - don't suspect anyone for a while
- for (InternalDistributedMember member : myView.getMembers()) {
- PhiAccrualFailureDetector detector = memberDetectors.get(member);
- if (detector != null) {
- detector.heartbeat(currentTimeStamp);
+ if (!Boolean.getBoolean("gemfire.failure-detection-watch-all-nodes")) {
+ InternalDistributedMember neighbour = nextNeighbor;
+ if (neighbour != null) {
+ PhiAccrualFailureDetector detector =
+ getOrCreatePhiAccrualFailureDetector(neighbour);
+ verifyHeartbeat(neighbour, detector);
+ }
+ } else {
+ for (Map.Entry<InternalDistributedMember, PhiAccrualFailureDetector> entry : memberDetectors
+ .entrySet()) {
+ InternalDistributedMember member = entry.getKey();
+ if (isSuspectMember(member)) {
+ continue;
+ }
+ PhiAccrualFailureDetector detector = entry.getValue();
+ verifyHeartbeat(member, detector);
}
}
- return;
+
+ } catch (RuntimeException | Error ex) {
+ logger.info("Health monitor encountered an exception", ex);
}
+ }
- // if (myView.getCoordinator().equals(localAddress)) {
- // for phi accrual we need to periodically check all members
- for (InternalDistributedMember member : myView.getMembers()) {
- PhiAccrualFailureDetector detector =
- getOrCreatePhiAccrualFailureDetector(member);
- if (!detector.isAvailable()) {
- checkMember(member);
- }
+ private void verifyHeartbeat(InternalDistributedMember member,
+ PhiAccrualFailureDetector detector) {
+ if (!detector.isAvailable(currentTimeStamp)) {
+ logger.warn("ERNIE & BRUCE: initiating checkMember from the health monitor for "
+ + "{} with phi={}, last heartbeat={} and recorded heartbeats={}",
+ member, detector.phi(currentTimeStamp), new Date(detector.getLastTimestampMillis()),
+ detector.heartbeatCount());
+ checkMember(member);
}
- return;
- // }
-
- // InternalDistributedMember neighbour = nextNeighbor;
- //
- // long currentTime = System.currentTimeMillis();
- // // this is the start of interval to record member activity
- // GMSHealthMonitor.this.currentTimeStamp = currentTime;
- //
- // if (neighbour != null) {
- // PhiAccrualFailureDetector nextNeighborDetector =
- // getOrCreatePhiAccrualFailureDetector(neighbour, currentTime);
- // if (!nextNeighborDetector.isAvailable()) {
- // logger.debug("Checking member {} ", neighbour);
- // // now do check request for this member;
- // checkMember(neighbour);
- // }
- // }
}
}
@@ -440,8 +447,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 200);
final int minStdDev = Integer.getInteger("geode.phiAccrualMinimumStandardDeviation", 100);
detector = new PhiAccrualFailureDetector(
- threshold, sampleSize, minStdDev, memberTimeout, memberTimeout);
- detector.heartbeat(currentTimeStamp);
+ threshold, sampleSize, minStdDev, memberTimeout / 2, currentTimeStamp);
memberDetectors.put(member, detector);
}
return detector;
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
index 6d0233a..6d2d78d 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
@@ -125,6 +125,9 @@ public class PhiAccrualFailureDetector {
heartbeatHistory = new HeartbeatHistory(maxSampleSize);
heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis)
.add(firstHeartbeatEstimateMillis + stdDeviationMillis);
+
+ // Bruce: record the estimate as the last timestamp received
+ lastTimestampMillis.set(firstHeartbeatEstimateMillis);
}
private double ensureValidStdDeviation(double stdDeviationMillis) {
@@ -156,7 +159,11 @@ public class PhiAccrualFailureDetector {
public boolean isAvailable(long timestampMillis) {
double currentPhi = phi(timestampMillis);
- return Double.isNaN(currentPhi) || currentPhi < threshold;
+ if (Double.isNaN(currentPhi)) {
+ // There isn't enough history to compute a valid Phi so we use a timestamp check
+ return (getLastTimestampMillis() >= timestampMillis - acceptableHeartbeatPauseMillis);
+ }
+ return currentPhi < threshold;
}
public boolean isAvailable() {
@@ -185,6 +192,10 @@ public class PhiAccrualFailureDetector {
return lastTimestampMillis.get();
}
+ public double getThreshold() {
+ return threshold;
+ }
+
public void heartbeat() {
heartbeat(System.currentTimeMillis());
}