You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/04/01 19:15:03 UTC
[geode] 01/01: GEODE-6583 Integrate phi-accrual failure detection
into Geode
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-6583
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 5a03277f1273fd266da7fddcb6406b0a655b8361
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Apr 1 12:11:00 2019 -0700
GEODE-6583 Integrate phi-accrual failure detection into Geode
*Not ready for review*
This experimental code integrates a history-based heaertbeat failure
detector with the existing Membership failure detector. Final checks
remain the same - pretty much everything is the same except I've
replaced the single heartbeat timestamp per member with a
PhiAccrualFailureDetector object. GMSHealthMonitorJUnitTest is passing
as are a handfull of other distributed and integration tests.
---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 2 +-
.../membership/gms/fd/GMSHealthMonitor.java | 70 +++---
.../gms/fd/PhiAccrualFailureDetector.java | 266 +++++++++++++++++++++
3 files changed, 307 insertions(+), 31 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index d80b424..524d785 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -775,7 +775,7 @@ public class GMSHealthMonitorJUnitTest {
HeartbeatMessage hb = new HeartbeatMessage(-1);
hb.setSender(mockMembers.get(0));
gmsHealthMonitor.processMessage(hb);
- assertTrue(gmsHealthMonitor.memberTimeStamps.get(hb.getSender()) == null);
+ assertTrue(gmsHealthMonitor.memberDetectors.get(hb.getSender()) == null);
// a sick member will not take action on a Suspect message from another member
SuspectMembersMessage smm = mock(SuspectMembersMessage.class);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index d1d19e7..41e41e5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -141,7 +141,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
/**
* Timestamp at which we last had contact from a member
*/
- final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps =
+ final ConcurrentMap<InternalDistributedMember, PhiAccrualFailureDetector> memberDetectors =
new ConcurrentHashMap<>();
/**
@@ -247,26 +247,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
GMSHealthMonitor.this.currentTimeStamp = currentTime;
if (neighbour != null) {
- TimeStamp nextNeighborTS;
- synchronized (GMSHealthMonitor.this) {
- nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
- }
-
- if (nextNeighborTS == null) {
- TimeStamp customTS = new TimeStamp(currentTime);
- memberTimeStamps.put(neighbour, customTS);
- return;
- }
-
- long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
- long lastTS = currentTime - nextNeighborTS.getTime();
- if (lastTS + interval >= memberTimeoutInMillis) {
+ PhiAccrualFailureDetector nextNeighborDetector =
+ getOrCreatePhiAccrualFailureDetector(neighbour, currentTime);
+ if (!nextNeighborDetector.isAvailable()) {
logger.debug("Checking member {} ", neighbour);
// now do check request for this member;
checkMember(neighbour);
}
}
}
+
}
/***
@@ -382,11 +372,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* Record member activity at a specified time
*/
private void contactedBy(InternalDistributedMember sender, long timeStamp) {
- TimeStamp cTS = new TimeStamp(timeStamp);
- cTS = memberTimeStamps.putIfAbsent(sender, cTS);
- if (cTS != null && cTS.getTime() < timeStamp) {
- cTS.setTime(timeStamp);
- }
+
+ PhiAccrualFailureDetector detector = getOrCreatePhiAccrualFailureDetector(sender, timeStamp);
+ detector.heartbeat(timeStamp);
if (suspectedMemberIds.containsKey(sender)) {
memberUnsuspected(sender);
setNextNeighbor(currentView, null);
@@ -394,6 +382,28 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
+ public PhiAccrualFailureDetector getOrCreatePhiAccrualFailureDetector(
+ InternalDistributedMember member,
+ long timeStamp) {
+ PhiAccrualFailureDetector detector;
+ // TODO use something cheaper than a sync on the health monitor
+ synchronized (GMSHealthMonitor.this) {
+ detector = GMSHealthMonitor.this.memberDetectors.get(member);
+
+ if (detector == null) {
+ logger.info("creating new failure detector for {}", member);
+ final double threshold = 10;
+ final int sampleSize = 200;
+ final int minStdDev = 100;
+ detector = new PhiAccrualFailureDetector(
+ threshold, sampleSize, minStdDev, memberTimeout, memberTimeout);
+ detector.heartbeat(timeStamp);
+ memberDetectors.put(member, detector);
+ }
+ }
+ return detector;
+ }
+
private HeartbeatRequestMessage constructHeartbeatRequestMessage(
final InternalDistributedMember mbr) {
final int reqId = requestId.getAndIncrement();
@@ -472,8 +482,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (pingResp.getResponseMsg() == null) {
pingResp.wait(memberTimeout);
}
- TimeStamp ts = memberTimeStamps.get(member);
- if (ts != null && ts.getTime() > startTime) {
+ PhiAccrualFailureDetector detector = memberDetectors.get(member);
+ if (detector != null && detector.isAvailable()) {
return true;
}
if (pingResp.getResponseMsg() == null) {
@@ -485,8 +495,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
} else {
logger.trace("received heartbeat from {}", member);
this.stats.incHeartbeatsReceived();
- if (ts != null) {
- ts.setTime(System.currentTimeMillis());
+ if (detector != null) {
+ detector.heartbeat();
}
return true;
}
@@ -581,9 +591,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
this.stats.incTcpFinalCheckResponsesReceived();
}
if (b == OK) {
- TimeStamp ts = memberTimeStamps.get(suspectMember);
- if (ts != null) {
- ts.setTime(System.currentTimeMillis());
+ PhiAccrualFailureDetector detector = memberDetectors.get(suspectMember);
+ if (detector != null) {
+ detector.heartbeat();
}
return true;
} else {
@@ -787,7 +797,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
synchronized (suspectRequestsInView) {
suspectRequestsInView.clear();
}
- for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it
+ for (Iterator<InternalDistributedMember> it = memberDetectors.keySet().iterator(); it
.hasNext();) {
if (!newView.contains(it.next())) {
it.remove();
@@ -1312,8 +1322,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
if (!pinged && !isStopping) {
- TimeStamp ts = memberTimeStamps.get(mbr);
- if (ts == null || ts.getTime() < startTime) {
+ PhiAccrualFailureDetector detector = memberDetectors.get(mbr);
+ if (detector == null || !detector.isAvailable()) {
logger.info("Availability check failed for member {}", mbr);
// if the final check fails & this VM is the coordinator we don't need to do another final
// check
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
new file mode 100644
index 0000000..760fbdc
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms.fd;
+
+/*
+ * Copyright 2018 Mitsunori Komatsu (komamitsu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * This is a port of
+ * https://github.com/akka/akka/blob/master/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala
+ *
+ * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their
+ * paper:
+ * [http://ddg.jaist.ac.jp/pub/HDY+04.pdf]
+ *
+ * The suspicion level of failure is given by a value called φ (phi).
+ * The basic idea of the φ failure detector is to express the value of φ on a scale that
+ * is dynamically adjusted to reflect current network conditions. A configurable
+ * threshold is used to decide if φ is considered to be a failure.
+ *
+ * The value of φ is calculated as:
+ *
+ * {{{
+ * φ = -log10(1 - F(timeSinceLastHeartbeat)
+ * }}}
+ * where F is the cumulative distribution function of a normal distribution with mean
+ * and standard deviation estimated from historical heartbeat inter-arrival times.
+ */
+public class PhiAccrualFailureDetector {
+ private static final Logger logger = LogService.getLogger();
+ private final double threshold;
+ private final double minStdDeviationMillis;
+ private final long acceptableHeartbeatPauseMillis;
+
+ private final HeartbeatHistory heartbeatHistory;
+ private final AtomicReference<Long> lastTimestampMillis = new AtomicReference<Long>();
+
+ /**
+ * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick
+ * detection in the event
+ * of a real crash. Conversely, a high threshold generates fewer mistakes but needs more
+ * time to detect
+ * actual crashes
+ *
+ * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
+ * inter-arrival times.
+ *
+ * @param minStdDeviationMillis Minimum standard deviation to use for the normal distribution used
+ * when calculating phi.
+ * Too low standard deviation might result in too much sensitivity for sudden, but normal,
+ * deviations
+ * in heartbeat inter arrival times.
+ *
+ * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially
+ * lost/delayed
+ * heartbeats that will be accepted before considering it to be an anomaly.
+ * This margin is important to be able to survive sudden, occasional, pauses in heartbeat
+ * arrivals, due to for example garbage collect or network drop.
+ *
+ * @param firstHeartbeatEstimateMillis Bootstrap the stats with heartbeats that corresponds to
+ * to this duration, with a with rather high standard deviation (since environment is
+ * unknown
+ * in the beginning)
+ *
+ * <p>
+ * Note for Geode use: this class was originally called PhiAccuralFailureDetector and is
+ * from
+ * https://github.com/komamitsu/phi-accural-failure-detector
+ * </p>
+ */
+ protected PhiAccrualFailureDetector(double threshold, int maxSampleSize,
+ double minStdDeviationMillis,
+ long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) {
+ if (threshold <= 0) {
+ throw new IllegalArgumentException("Threshold must be > 0: " + threshold);
+ }
+ if (maxSampleSize <= 0) {
+ throw new IllegalArgumentException("Sample size must be > 0: " + maxSampleSize);
+ }
+ if (minStdDeviationMillis <= 0) {
+ throw new IllegalArgumentException(
+ "Minimum standard deviation must be > 0: " + minStdDeviationMillis);
+ }
+ if (acceptableHeartbeatPauseMillis < 0) {
+ throw new IllegalArgumentException(
+ "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);
+ }
+ if (firstHeartbeatEstimateMillis <= 0) {
+ throw new IllegalArgumentException(
+ "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis);
+ }
+
+ this.threshold = threshold;
+ this.minStdDeviationMillis = minStdDeviationMillis;
+ this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
+
+ long stdDeviationMillis = firstHeartbeatEstimateMillis / 4;
+ heartbeatHistory = new HeartbeatHistory(maxSampleSize);
+ heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis)
+ .add(firstHeartbeatEstimateMillis + stdDeviationMillis);
+ }
+
+ private double ensureValidStdDeviation(double stdDeviationMillis) {
+ return Math.max(stdDeviationMillis, minStdDeviationMillis);
+ }
+
+ public synchronized double phi(long timestampMillis) {
+ Long lastTimestampMillis = this.lastTimestampMillis.get();
+ if (lastTimestampMillis == null) {
+ return 0.0;
+ }
+
+ long timeDiffMillis = timestampMillis - lastTimestampMillis;
+ double meanMillis = heartbeatHistory.mean() + acceptableHeartbeatPauseMillis;
+ double stdDeviationMillis = ensureValidStdDeviation(heartbeatHistory.stdDeviation());
+
+ double y = (timeDiffMillis - meanMillis) / stdDeviationMillis;
+ double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
+ if (timeDiffMillis > meanMillis) {
+ return -Math.log10(e / (1.0 + e));
+ } else {
+ return -Math.log10(1.0 - 1.0 / (1.0 + e));
+ }
+ }
+
+ public synchronized double phi() {
+ return phi(System.currentTimeMillis());
+ }
+
+ public boolean isAvailable(long timestampMillis) {
+ double currentPhi = phi(timestampMillis);
+ return currentPhi < threshold;
+ }
+
+ public boolean isAvailable() {
+ double currentPhi = phi(System.currentTimeMillis());
+ return currentPhi < threshold;
+ }
+
+ public synchronized void heartbeat(long timestampMillis) {
+ Long lastTimestampMillis = this.lastTimestampMillis.getAndSet(timestampMillis);
+ if (lastTimestampMillis != null) {
+ long interval = timestampMillis - lastTimestampMillis;
+ if (isAvailable(timestampMillis)) {
+ heartbeatHistory.add(interval);
+ }
+ }
+ }
+
+ public void heartbeat() {
+ heartbeat(System.currentTimeMillis());
+ }
+
+ public static class Builder {
+ private double threshold = 16.0;
+ private int maxSampleSize = 200;
+ private double minStdDeviationMillis = 500;
+ private long acceptableHeartbeatPauseMillis = 0;
+ private long firstHeartbeatEstimateMillis = 500;
+
+ public Builder setThreshold(double threshold) {
+ this.threshold = threshold;
+ return this;
+ }
+
+ public Builder setMaxSampleSize(int maxSampleSize) {
+ this.maxSampleSize = maxSampleSize;
+ return this;
+ }
+
+ public Builder setMinStdDeviationMillis(double minStdDeviationMillis) {
+ this.minStdDeviationMillis = minStdDeviationMillis;
+ return this;
+ }
+
+ public Builder setAcceptableHeartbeatPauseMillis(long acceptableHeartbeatPauseMillis) {
+ this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
+ return this;
+ }
+
+ public Builder setFirstHeartbeatEstimateMillis(long firstHeartbeatEstimateMillis) {
+ this.firstHeartbeatEstimateMillis = firstHeartbeatEstimateMillis;
+ return this;
+ }
+
+ public PhiAccrualFailureDetector build() {
+ return new PhiAccrualFailureDetector(threshold, maxSampleSize, minStdDeviationMillis,
+ acceptableHeartbeatPauseMillis, firstHeartbeatEstimateMillis);
+ }
+ }
+
+ private static class HeartbeatHistory {
+ private final int maxSampleSize;
+ private final LinkedList<Long> intervals = new LinkedList<Long>();
+ private final AtomicLong intervalSum = new AtomicLong();
+ private final AtomicLong squaredIntervalSum = new AtomicLong();
+
+ public HeartbeatHistory(int maxSampleSize) {
+ if (maxSampleSize < 1) {
+ throw new IllegalArgumentException("maxSampleSize must be >= 1, got " + maxSampleSize);
+ }
+ this.maxSampleSize = maxSampleSize;
+ }
+
+ public double mean() {
+ return (double) intervalSum.get() / intervals.size();
+ }
+
+ public double variance() {
+ return ((double) squaredIntervalSum.get() / intervals.size()) - (mean() * mean());
+ }
+
+ public double stdDeviation() {
+ return Math.sqrt(variance());
+ }
+
+ public HeartbeatHistory add(long interval) {
+ if (intervals.size() >= maxSampleSize) {
+ Long dropped = intervals.pollFirst();
+ intervalSum.addAndGet(-dropped);
+ squaredIntervalSum.addAndGet(-pow2(dropped));
+ }
+ intervals.add(interval);
+ intervalSum.addAndGet(interval);
+ squaredIntervalSum.addAndGet(pow2(interval));
+ return this;
+ }
+
+ private long pow2(long x) {
+ return x * x;
+ }
+ }
+}