You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/05/08 15:32:29 UTC
[geode] branch feature/GEODE-6583 updated: restoring isAvailable
check in heartbeat recording
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-6583
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6583 by this push:
new 7e701e4 restoring isAvailable check in heartbeat recording
7e701e4 is described below
commit 7e701e46ab5a76229e9eb2e73a534621d8e2afeb
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Wed May 8 08:31:36 2019 -0700
restoring isAvailable check in heartbeat recording
tests still fail with detectors being too slow to initiate suspicion.
concCoordSelection.conf, for instance.
---
.../apache/geode/distributed/LocatorDUnitTest.java | 6 +-
.../gms/fd/GMSHealthMonitorJUnitTest.java | 2 +-
.../gms/fd/PhiAccrualFailureDetectorTest.java | 25 +++----
.../membership/gms/fd/GMSHealthMonitor.java | 84 +++++++++++++---------
.../gms/fd/PhiAccrualFailureDetector.java | 47 ++++++------
.../org/apache/geode/internal/tcp/Connection.java | 8 +--
6 files changed, 89 insertions(+), 83 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
index e07f572..64646ac 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/distributed/LocatorDUnitTest.java
@@ -92,7 +92,6 @@ import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.logging.LocalLogWriter;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
-import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DUnitBlackboard;
import org.apache.geode.test.dunit.DistributedTestUtils;
@@ -694,8 +693,9 @@ public class LocatorDUnitTest implements java.io.Serializable {
// quorumLost should be invoked if we get a ForcedDisconnect in this situation
assertThat(listener.quorumLostInvoked).describedAs("expected quorumLost to be invoked")
.isTrue();
- assertThat(listener.suspectReasons.contains(Connection.INITIATING_SUSPECT_PROCESSING))
- .describedAs("expected suspect processing initiated by TCPConduit").isTrue();
+ System.out.println("suspectReasons=" + listener.suspectReasons);
+ assertThat(listener.suspectReasons.contains("Member isn't responding to heartbeat requests"))
+ .describedAs("expected suspect processing initiated by health monitor").isTrue();
} finally {
if (locator != null) {
locator.stop();
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index a907511..fbc9c5c 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -760,7 +760,7 @@ public class GMSHealthMonitorJUnitTest {
HeartbeatMessage hb = new HeartbeatMessage(-1);
hb.setSender(mockMembers.get(0));
gmsHealthMonitor.processMessage(hb);
- assertEquals(2, gmsHealthMonitor.memberDetectors.get(hb.getSender()).heartbeatCount());
+ assertEquals(0, gmsHealthMonitor.memberDetectors.get(hb.getSender()).heartbeatsRecorded());
// a sick member will not take action on a Suspect message from another member
SuspectMembersMessage smm = mock(SuspectMembersMessage.class);
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
index f192e76..c5358eb 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
@@ -19,8 +19,6 @@ package org.apache.geode.distributed.internal.membership.gms.fd;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.util.Random;
-
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -37,6 +35,7 @@ public class PhiAccrualFailureDetectorTest {
if (i > 290) {
double phi = failureDetector.phi(timestampMillis);
+ System.out.println("for interval " + i + ", phi=" + phi);
if (i == 291) {
assertTrue(1 < phi && phi < 3);
assertTrue(failureDetector.isAvailable(timestampMillis));
@@ -72,7 +71,9 @@ public class PhiAccrualFailureDetectorTest {
}
}
failureDetector.heartbeat(timestampMillis);
- assertTrue(failureDetector.phi(timestampMillis) < 0.1);
+ double phi = failureDetector.phi(timestampMillis);
+ System.out.println("for interval " + i + ", phi=" + phi);
+ assertTrue("for " + i + ", expected phi<0.1 but was " + phi, phi < 0.1);
assertTrue(failureDetector.isAvailable(timestampMillis));
}
}
@@ -81,30 +82,26 @@ public class PhiAccrualFailureDetectorTest {
public void isAvailableTest() throws Exception {
double threshold = 10;
int historySize = 200;
- double stddev = 100;
+ double stddev = 50;
long now = System.currentTimeMillis();
long[] intervals =
- new long[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -14, -15, -16, -17, -18, -19, -20};
+ new long[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -14, -15, -16, -17, -18, -19, -20,
+ 21, 22, 23, 24};
int heartbeatInterval = 5000;
- int acceptableHeartbeatPauseMillis = heartbeatInterval / 2;
- Random random = new Random();
+ int acceptableHeartbeatPauseMillis = 0; // heartbeatInterval / 4;
PhiAccrualFailureDetector detector =
new PhiAccrualFailureDetector(threshold, historySize, stddev,
- acceptableHeartbeatPauseMillis, now);
+ acceptableHeartbeatPauseMillis, heartbeatInterval);
long heartbeatTime = 0;
for (long l : intervals) {
if (l > 0) {
- long deviation = getDeviation(heartbeatInterval, random);
- detector.heartbeat(now + (heartbeatInterval * l) + deviation);
+ detector.heartbeat(now + (heartbeatInterval * l));
}
l = Math.abs(l);
heartbeatTime = now + (heartbeatInterval * l);
- System.out.println("heartbeat " + l + " phi= " + detector.phi(heartbeatTime));
+ System.out.printf("heartbeat %d phi= %s%n", l, detector.phi(heartbeatTime));
}
assertFalse("phi=" + detector.phi(heartbeatTime), detector.isAvailable(heartbeatTime));
}
- public int getDeviation(int heartbeatInterval, Random random) {
- return random.nextInt(heartbeatInterval / 4);
- }
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 93c28c2..1da7978 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -275,23 +275,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
return;
}
- if (!Boolean.getBoolean("gemfire.failure-detection-watch-all-nodes")) {
- InternalDistributedMember neighbour = nextNeighbor;
- if (neighbour != null) {
- PhiAccrualFailureDetector detector =
- getOrCreatePhiAccrualFailureDetector(neighbour);
- verifyHeartbeat(neighbour, detector);
- }
- } else {
- for (Map.Entry<InternalDistributedMember, PhiAccrualFailureDetector> entry : memberDetectors
- .entrySet()) {
- InternalDistributedMember member = entry.getKey();
- if (isSuspectMember(member)) {
- continue;
- }
- PhiAccrualFailureDetector detector = entry.getValue();
- verifyHeartbeat(member, detector);
- }
+ InternalDistributedMember neighbour = nextNeighbor;
+ if (neighbour != null) {
+ PhiAccrualFailureDetector detector =
+ getOrCreatePhiAccrualFailureDetector(neighbour);
+ verifyHeartbeat(neighbour, detector);
}
} catch (RuntimeException | Error ex) {
@@ -303,10 +291,21 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
PhiAccrualFailureDetector detector) {
if (!detector.isAvailable(currentTimeStamp)) {
logger.warn("ERNIE & BRUCE: initiating checkMember from the health monitor for "
- + "{} with phi={}, last heartbeat={} and recorded heartbeats={}",
- member, detector.phi(currentTimeStamp), new Date(detector.getLastTimestampMillis()),
- detector.heartbeatCount());
+ + "{} with phi={}, last heartbeat={} heartbeats recorded={}",
+ member, detector.phi(currentTimeStamp), detector.getLastTimestampMillis() == null
+ ? "none" : new Date(detector.getLastTimestampMillis()),
+ detector.heartbeatsRecorded());
checkMember(member);
+ } else {
+ logger.info("ERNIE & BRUCE: verifyHeartbeat found {} is available with phi={}"
+ + " last heartbeat={} heartbeats recorded={}",
+ member, detector.phi(currentTimeStamp), detector.getLastTimestampMillis() == null
+ ? "none" : new Date(detector.getLastTimestampMillis()),
+ detector.heartbeatsRecorded());
+ if (detector.getLastTimestampMillis() != null &&
+ detector.getLastTimestampMillis() + (2 * memberTimeout) < currentTimeStamp) {
+ logger.info("dump of phi detector's history: {}", detector.getIntervalHistory());
+ }
}
}
@@ -427,8 +426,17 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* Record member activity at a specified time
*/
private void contactedBy(InternalDistributedMember sender, long timeStamp) {
- PhiAccrualFailureDetector detector = getOrCreatePhiAccrualFailureDetector(sender);
- detector.heartbeat(timeStamp);
+ PhiAccrualFailureDetector detector = memberDetectors.get(sender);
+ if (detector != null) {
+ detector.heartbeat(timeStamp);
+ List<Long> intervalHistory = detector.getIntervalHistory();
+ long lastInterval = intervalHistory.get(intervalHistory.size() - 1);
+ if (lastInterval > 7000) {
+ logger.warn(
+ "ERNIE & BRUCE: recording a large heartbeat interval of {} for {} with {} heartbeats",
+ lastInterval, sender, intervalHistory.size() - 2);
+ }
+ }
if (suspectedMemberIds.containsKey(sender)) {
memberUnsuspected(sender);
setNextNeighbor(currentView, null);
@@ -443,13 +451,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (detector == null) {
// it's okay to not synchronize here - if we happen to create another detector
// for this member in another thread it will be pretty equivalent to the one created here
- logger.info("Creating new failure detector for {}", member);
- // TODO consider giving different kinds of members different thresholds
- final int threshold = Integer.getInteger("geode.phiAccrualThreshold", 10);
- final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 15);
- final int minStdDev = Integer.getInteger("geode.phiAccrualMinimumStandardDeviation", 100);
+ final int threshold = Integer.getInteger("geode.phiAccrualThreshold", 5);
+ final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 200);
+ final long minStdDev = Long.getLong("geode.phiAccrualMinimumStandardDeviation",
+ memberTimeout / 10);
+ // have tried acceptableHeartbeatPauseMillis=0, memberTimeout/2 and /4
detector = new PhiAccrualFailureDetector(
- threshold, sampleSize, minStdDev, memberTimeout / 2, currentTimeStamp);
+ threshold, sampleSize, minStdDev, memberTimeout / 4, memberTimeout);
+ logger.info(
+ "ERNIE & BRUCE: created a new failure detector for {} with lastTimestamp={} and recorded heartbeats={}",
+ member, detector.getLastTimestampMillis(), detector.heartbeatsRecorded());
memberDetectors.put(member, detector);
}
return detector;
@@ -691,6 +702,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
checkExecutor = LoggingExecutors.newCachedThreadPool("Geode Failure Detection thread ", true);
Monitor m = this.new Monitor(memberTimeout);
monitorInterval = memberTimeout / LOGICAL_INTERVAL;
+ currentTimeStamp = System.currentTimeMillis(); // Monitor expects this to be up-to-date when it
+ // starts
monitorFuture =
scheduler.scheduleAtFixedRate(m, monitorInterval, monitorInterval, TimeUnit.MILLISECONDS);
serverSocketExecutor =
@@ -850,12 +863,11 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
synchronized (suspectRequestsInView) {
suspectRequestsInView.clear();
}
- long time = System.currentTimeMillis();
- for (InternalDistributedMember member : newView.getMembers()) {
- if (!member.equals(this.localAddress)) {
- getOrCreatePhiAccrualFailureDetector(member);
- }
- }
+ // for (InternalDistributedMember member : newView.getMembers()) {
+ // if (!member.equals(this.localAddress)) {
+ // getOrCreatePhiAccrualFailureDetector(member);
+ // }
+ // }
for (Iterator<InternalDistributedMember> it = memberDetectors.keySet().iterator(); it
.hasNext();) {
if (!newView.contains(it.next())) {
@@ -1174,6 +1186,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
// we got heartbeat lets update timestamp
+ getOrCreatePhiAccrualFailureDetector(m.getSender());
contactedBy(m.getSender(), System.currentTimeMillis());
}
@@ -1379,7 +1392,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (!pinged && !isStopping) {
PhiAccrualFailureDetector detector = memberDetectors.get(mbr);
- if (detector == null || detector.getLastTimestampMillis() < startTime) {
+ if (detector == null || detector.getLastTimestampMillis() == null
+ || detector.getLastTimestampMillis() < startTime) {
logger.info("Availability check failed for member {}", mbr);
// if the final check fails & this VM is the coordinator we don't need to do another final
// check
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
index 732d58c..affd09e 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
@@ -17,6 +17,7 @@
package org.apache.geode.distributed.internal.membership.gms.fd;
import java.util.LinkedList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -61,6 +62,7 @@ public class PhiAccrualFailureDetector {
private final HeartbeatHistory heartbeatHistory;
private final AtomicReference<Long> lastTimestampMillis = new AtomicReference<Long>();
+ private final AtomicLong heartbeatsRecorded = new AtomicLong();
/**
* @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick
@@ -71,20 +73,17 @@ public class PhiAccrualFailureDetector {
*
* @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
* inter-arrival times.
- *
* @param minStdDeviationMillis Minimum standard deviation to use for the normal distribution used
* when calculating phi.
* Too low standard deviation might result in too much sensitivity for sudden, but normal,
* deviations
* in heartbeat inter arrival times.
- *
* @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially
* lost/delayed
* heartbeats that will be accepted before considering it to be an anomaly.
* This margin is important to be able to survive sudden, occasional, pauses in heartbeat
* arrivals, due to for example garbage collect or network drop.
- *
- * @param firstHeartbeatEstimateMillis Bootstrap the stats with heartbeats that corresponds to
+ * @param firstHeartbeatIntervalMillis Bootstrap the stats with heartbeats that corresponds to
* to this duration, with a with rather high standard deviation (since environment is
* unknown
* in the beginning)
@@ -93,11 +92,11 @@ public class PhiAccrualFailureDetector {
* Note for Geode use: this class was originally called PhiAccuralFailureDetector and is
* from
* https://github.com/komamitsu/phi-accural-failure-detector
- * </p>
*/
protected PhiAccrualFailureDetector(double threshold, int maxSampleSize,
double minStdDeviationMillis,
- long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) {
+ long acceptableHeartbeatPauseMillis,
+ long firstHeartbeatIntervalMillis) {
if (threshold <= 0) {
throw new IllegalArgumentException("Threshold must be > 0: " + threshold);
}
@@ -112,9 +111,9 @@ public class PhiAccrualFailureDetector {
throw new IllegalArgumentException(
"Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);
}
- if (firstHeartbeatEstimateMillis <= 0) {
+ if (firstHeartbeatIntervalMillis <= 0) {
throw new IllegalArgumentException(
- "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis);
+ "First heartbeat value must be > 0: " + firstHeartbeatIntervalMillis);
}
this.threshold = threshold;
@@ -122,16 +121,10 @@ public class PhiAccrualFailureDetector {
this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
heartbeatHistory = new HeartbeatHistory(maxSampleSize);
- for (int i = 0; i < maxSampleSize; i++) {
- heartbeatHistory.add(acceptableHeartbeatPauseMillis);
- }
-
- // long stdDeviationMillis = acceptableHeartbeatPauseMillis / 4;
- // heartbeatHistory.add(acceptableHeartbeatPauseMillis - stdDeviationMillis)
- // .add(acceptableHeartbeatPauseMillis + stdDeviationMillis);
+ long stdDeviationMillis = firstHeartbeatIntervalMillis / 4;
+ heartbeatHistory.add(firstHeartbeatIntervalMillis - stdDeviationMillis)
+ .add(firstHeartbeatIntervalMillis + stdDeviationMillis);
- // Bruce: record the estimate as the last timestamp received
- lastTimestampMillis.set(firstHeartbeatEstimateMillis);
}
private double ensureValidStdDeviation(double stdDeviationMillis) {
@@ -174,25 +167,26 @@ public class PhiAccrualFailureDetector {
return isAvailable(System.currentTimeMillis());
}
- public int heartbeatCount() {
- return this.heartbeatHistory.size();
+ public long heartbeatsRecorded() {
+ return heartbeatsRecorded.get();
}
public synchronized void heartbeat(long timestampMillis) {
Long lastTimestampMillis = this.lastTimestampMillis.getAndSet(timestampMillis);
/** bruce s.: for Apache Geode, don't record duplicate heartbeats */
- if (lastTimestampMillis != null && lastTimestampMillis == timestampMillis) {
+ if (lastTimestampMillis != null && lastTimestampMillis >= timestampMillis) {
return;
}
if (lastTimestampMillis != null) {
long interval = timestampMillis - lastTimestampMillis;
if (isAvailable(timestampMillis)) {
heartbeatHistory.add(interval);
+ heartbeatsRecorded.incrementAndGet();
}
}
}
- public long getLastTimestampMillis() {
+ public Long getLastTimestampMillis() {
return lastTimestampMillis.get();
}
@@ -204,6 +198,10 @@ public class PhiAccrualFailureDetector {
heartbeat(System.currentTimeMillis());
}
+ public List<Long> getIntervalHistory() {
+ return heartbeatHistory.intervals;
+ }
+
public static class Builder {
private double threshold = 16.0;
private int maxSampleSize = 200;
@@ -260,7 +258,8 @@ public class PhiAccrualFailureDetector {
}
public double variance() {
- return ((double) squaredIntervalSum.get() / intervals.size()) - (mean() * mean());
+ double mean = mean();
+ return ((double) squaredIntervalSum.get() / intervals.size()) - (mean * mean);
}
public double stdDeviation() {
@@ -279,10 +278,6 @@ public class PhiAccrualFailureDetector {
return this;
}
- private int size() {
- return intervals.size();
- }
-
private long pow2(long x) {
return x * x;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 2cda8b6..3f32624 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -121,10 +121,10 @@ public class Connection implements Runnable {
@MakeNotStatic
private static final AtomicLong idCounter = new AtomicLong(1);
- /** string used as the reason for initiating suspect processing */
- public static final String INITIATING_SUSPECT_PROCESSING =
- "member unexpectedly shut down shared, unordered connection";
-
+ // /** string used as the reason for initiating suspect processing */
+ // public static final String INITIATING_SUSPECT_PROCESSING =
+ // "member unexpectedly shut down shared, unordered connection";
+ //
/** the table holding this connection */
private final ConnectionTable owner;