You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/05/14 18:17:28 UTC
[geode] 01/02: cleanup of AccrualFailureDetector & stat changes to
capture in csv form
This is an automated email from the ASF dual-hosted git repository.
bschuchardt pushed a commit to branch feature/GEODE-6583
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 63dfa1b2189d7005ced71cabf886d77428f908f5
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Tue May 14 11:04:40 2019 -0700
cleanup of AccrualFailureDetector & stat changes to capture in csv form
---
.../gms/fd/AdaptiveAccrualFailureDetectorTest.java | 56 ++++++++
.../gms/fd/GMSHealthMonitorJUnitTest.java | 3 +-
.../gms/fd/PhiAccrualFailureDetectorTest.java | 12 +-
...or.java => AdaptiveAccrualFailureDetector.java} | 146 +++++----------------
.../membership/gms/fd/FailureDetector.java | 33 +++++
.../membership/gms/fd/GMSHealthMonitor.java | 57 ++++----
.../gms/fd/PhiAccrualFailureDetector.java | 14 +-
.../org/apache/geode/internal/SystemAdmin.java | 17 +--
.../internal/statistics/StatArchiveReader.java | 8 ++
9 files changed, 188 insertions(+), 158 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/AdaptiveAccrualFailureDetectorTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/AdaptiveAccrualFailureDetectorTest.java
new file mode 100644
index 0000000..ca304cf
--- /dev/null
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/AdaptiveAccrualFailureDetectorTest.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018 Mitsunori Komatsu (komamitsu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.distributed.internal.membership.gms.fd;
+
+import static org.junit.Assert.assertFalse;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.MembershipTest;
+
+@Category({MembershipTest.class})
+public class AdaptiveAccrualFailureDetectorTest {
+ @Test
+ public void isAvailableTest() throws Exception {
+ double threshold = 0.8;
+ int historySize = 200;
+ long now = System.currentTimeMillis();
+ long[] intervals =
+ new long[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, -14, -15};
+ int heartbeatInterval = 5000;
+ AdaptiveAccrualFailureDetector detector =
+ new AdaptiveAccrualFailureDetector(threshold, historySize, heartbeatInterval);
+ long heartbeatTime = 0;
+ for (int i=0; i<historySize; i++) {
+ now += heartbeatInterval;
+ detector.heartbeat(now);
+ }
+ for (long l : intervals) {
+ if (l > 0) {
+ detector.heartbeat(now + (heartbeatInterval * l));
+ }
+ l = Math.abs(l);
+ heartbeatTime = now + (heartbeatInterval * l);
+ System.out.printf("heartbeat %d probability= %s%n", l,
+ detector.availabilityProbability(heartbeatTime));
+ }
+ assertFalse("probability=" + detector.availabilityProbability(heartbeatTime),
+ detector.isAvailable(heartbeatTime));
+ }
+
+}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index fbc9c5c..06dd191 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -29,6 +29,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.isA;
@@ -760,7 +761,7 @@ public class GMSHealthMonitorJUnitTest {
HeartbeatMessage hb = new HeartbeatMessage(-1);
hb.setSender(mockMembers.get(0));
gmsHealthMonitor.processMessage(hb);
- assertEquals(0, gmsHealthMonitor.memberDetectors.get(hb.getSender()).heartbeatsRecorded());
+ assertNull(gmsHealthMonitor.memberDetectors.get(hb.getSender()));
// a sick member will not take action on a Suspect message from another member
SuspectMembersMessage smm = mock(SuspectMembersMessage.class);
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
index e8cbf1a..292908a 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetectorTest.java
@@ -34,7 +34,7 @@ public class PhiAccrualFailureDetectorTest {
long timestampMillis = now + i * 1000;
if (i > 290) {
- double phi = failureDetector.phi(timestampMillis);
+ double phi = failureDetector.availabilityProbability(timestampMillis);
System.out.println("for interval " + i + ", phi=" + phi);
if (i == 291) {
assertTrue(1 < phi && phi < 3);
@@ -64,14 +64,14 @@ public class PhiAccrualFailureDetectorTest {
continue;
} else if (i > 200) {
if (i % 5 == 0) {
- double phi = failureDetector.phi(timestampMillis);
+ double phi = failureDetector.availabilityProbability(timestampMillis);
assertTrue(0.1 < phi && phi < 0.5);
assertTrue(failureDetector.isAvailable(timestampMillis));
continue;
}
}
failureDetector.heartbeat(timestampMillis);
- double phi = failureDetector.phi(timestampMillis);
+ double phi = failureDetector.availabilityProbability(timestampMillis);
System.out.println("for interval " + i + ", phi=" + phi);
assertTrue("for " + i + ", expected phi<0.1 but was " + phi, phi < 0.1);
assertTrue(failureDetector.isAvailable(timestampMillis));
@@ -98,9 +98,11 @@ public class PhiAccrualFailureDetectorTest {
}
l = Math.abs(l);
heartbeatTime = now + (heartbeatInterval * l);
- System.out.printf("heartbeat %d phi= %s%n", l, detector.phi(heartbeatTime));
+ System.out.printf("heartbeat %d phi= %s%n", l,
+ detector.availabilityProbability(heartbeatTime));
}
- assertFalse("phi=" + detector.phi(heartbeatTime), detector.isAvailable(heartbeatTime));
+ assertFalse("phi=" + detector.availabilityProbability(heartbeatTime),
+ detector.isAvailable(heartbeatTime));
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/AdaptiveAccrualFailureDetector.java
similarity index 56%
copy from geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
copy to geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/AdaptiveAccrualFailureDetector.java
index affd09e..963a4ee 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/AdaptiveAccrualFailureDetector.java
@@ -27,8 +27,11 @@ import org.apache.geode.internal.logging.LogService;
/**
* <p>
- * Ported to Geode from https://github.com/komamitsu/phi-accural-failure-detector. Javadoc
- * from that repo follows...
+ * Ported to Geode from https://github.com/komamitsu/phi-accural-failure-detector and modified
+ * to use the Adaptive Accrual algorithm described here:
+ * https://www.informatik.uni-augsburg.de/lehrstuehle/sik/publikationen/papers/2007_sac-dads_sat/paper.pdf
+ *
+ * Javadoc from the komamitsu repo follows...
* </p>
* <p>
* This is a port of
@@ -53,12 +56,10 @@ import org.apache.geode.internal.logging.LogService;
* and standard deviation estimated from historical heartbeat inter-arrival times.
*
*/
-public class PhiAccrualFailureDetector {
+public class AdaptiveAccrualFailureDetector implements FailureDetector {
private static final Logger logger = LogService.getLogger();
private final double threshold;
- private final double minStdDeviationMillis;
- private final long acceptableHeartbeatPauseMillis;
private final HeartbeatHistory heartbeatHistory;
private final AtomicReference<Long> lastTimestampMillis = new AtomicReference<Long>();
@@ -73,16 +74,6 @@ public class PhiAccrualFailureDetector {
*
* @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
* inter-arrival times.
- * @param minStdDeviationMillis Minimum standard deviation to use for the normal distribution used
- * when calculating phi.
- * Too low standard deviation might result in too much sensitivity for sudden, but normal,
- * deviations
- * in heartbeat inter arrival times.
- * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially
- * lost/delayed
- * heartbeats that will be accepted before considering it to be an anomaly.
- * This margin is important to be able to survive sudden, occasional, pauses in heartbeat
- * arrivals, due to for example garbage collect or network drop.
* @param firstHeartbeatIntervalMillis Bootstrap the stats with heartbeats that corresponds to
* to this duration, with a with rather high standard deviation (since environment is
* unknown
@@ -93,9 +84,7 @@ public class PhiAccrualFailureDetector {
* from
* https://github.com/komamitsu/phi-accural-failure-detector
*/
- protected PhiAccrualFailureDetector(double threshold, int maxSampleSize,
- double minStdDeviationMillis,
- long acceptableHeartbeatPauseMillis,
+ protected AdaptiveAccrualFailureDetector(double threshold, int maxSampleSize,
long firstHeartbeatIntervalMillis) {
if (threshold <= 0) {
throw new IllegalArgumentException("Threshold must be > 0: " + threshold);
@@ -103,22 +92,12 @@ public class PhiAccrualFailureDetector {
if (maxSampleSize <= 0) {
throw new IllegalArgumentException("Sample size must be > 0: " + maxSampleSize);
}
- if (minStdDeviationMillis <= 0) {
- throw new IllegalArgumentException(
- "Minimum standard deviation must be > 0: " + minStdDeviationMillis);
- }
- if (acceptableHeartbeatPauseMillis < 0) {
- throw new IllegalArgumentException(
- "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);
- }
if (firstHeartbeatIntervalMillis <= 0) {
throw new IllegalArgumentException(
"First heartbeat value must be > 0: " + firstHeartbeatIntervalMillis);
}
this.threshold = threshold;
- this.minStdDeviationMillis = minStdDeviationMillis;
- this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
heartbeatHistory = new HeartbeatHistory(maxSampleSize);
long stdDeviationMillis = firstHeartbeatIntervalMillis / 4;
@@ -127,50 +106,35 @@ public class PhiAccrualFailureDetector {
}
- private double ensureValidStdDeviation(double stdDeviationMillis) {
- return Math.max(stdDeviationMillis, minStdDeviationMillis);
- }
-
- public synchronized double phi(long timestampMillis) {
+ @Override
+ public synchronized double availabilityProbability(long timestampMillis) {
Long lastTimestampMillis = this.lastTimestampMillis.get();
if (lastTimestampMillis == null) {
return 0.0;
}
- long timeDiffMillis = timestampMillis - lastTimestampMillis;
- double meanMillis = heartbeatHistory.mean() + acceptableHeartbeatPauseMillis;
- double stdDeviationMillis = ensureValidStdDeviation(heartbeatHistory.stdDeviation());
-
- double y = (timeDiffMillis - meanMillis) / stdDeviationMillis;
- double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
- if (timeDiffMillis > meanMillis) {
- return -Math.log10(e / (1.0 + e));
- } else {
- return -Math.log10(1.0 - 1.0 / (1.0 + e));
- }
- }
-
- public synchronized double phi() {
- return phi(System.currentTimeMillis());
+ long tDelta = timestampMillis - lastTimestampMillis;
+ double alpha = 1.1; // TODO -should be configurable
+ long Stdelta = heartbeatHistory.Stdelta(Math.round(tDelta * alpha));
+ long S = heartbeatHistory.size();
+ return (double) Stdelta / (double) S;
}
+ @Override
public boolean isAvailable(long timestampMillis) {
- double currentPhi = phi(timestampMillis);
- if (Double.isNaN(currentPhi)) {
- // There isn't enough history to compute a valid Phi so we use a timestamp check
- return (getLastTimestampMillis() >= timestampMillis - acceptableHeartbeatPauseMillis);
- }
- return currentPhi < threshold;
+ return availabilityProbability(timestampMillis) < threshold;
}
public boolean isAvailable() {
return isAvailable(System.currentTimeMillis());
}
+ @Override
public long heartbeatsRecorded() {
return heartbeatsRecorded.get();
}
+ @Override
public synchronized void heartbeat(long timestampMillis) {
Long lastTimestampMillis = this.lastTimestampMillis.getAndSet(timestampMillis);
/** bruce s.: for Apache Geode, don't record duplicate heartbeats */
@@ -186,6 +150,7 @@ public class PhiAccrualFailureDetector {
}
}
+ @Override
public Long getLastTimestampMillis() {
return lastTimestampMillis.get();
}
@@ -194,57 +159,19 @@ public class PhiAccrualFailureDetector {
return threshold;
}
+ @Override
public void heartbeat() {
heartbeat(System.currentTimeMillis());
}
+ @Override
public List<Long> getIntervalHistory() {
return heartbeatHistory.intervals;
}
- public static class Builder {
- private double threshold = 16.0;
- private int maxSampleSize = 200;
- private double minStdDeviationMillis = 500;
- private long acceptableHeartbeatPauseMillis = 0;
- private long firstHeartbeatEstimateMillis = 500;
-
- public Builder setThreshold(double threshold) {
- this.threshold = threshold;
- return this;
- }
-
- public Builder setMaxSampleSize(int maxSampleSize) {
- this.maxSampleSize = maxSampleSize;
- return this;
- }
-
- public Builder setMinStdDeviationMillis(double minStdDeviationMillis) {
- this.minStdDeviationMillis = minStdDeviationMillis;
- return this;
- }
-
- public Builder setAcceptableHeartbeatPauseMillis(long acceptableHeartbeatPauseMillis) {
- this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
- return this;
- }
-
- public Builder setFirstHeartbeatEstimateMillis(long firstHeartbeatEstimateMillis) {
- this.firstHeartbeatEstimateMillis = firstHeartbeatEstimateMillis;
- return this;
- }
-
- public PhiAccrualFailureDetector build() {
- return new PhiAccrualFailureDetector(threshold, maxSampleSize, minStdDeviationMillis,
- acceptableHeartbeatPauseMillis, firstHeartbeatEstimateMillis);
- }
- }
-
private static class HeartbeatHistory {
private final int maxSampleSize;
private final LinkedList<Long> intervals = new LinkedList<Long>();
- private final AtomicLong intervalSum = new AtomicLong();
- private final AtomicLong squaredIntervalSum = new AtomicLong();
public HeartbeatHistory(int maxSampleSize) {
if (maxSampleSize < 1) {
@@ -253,33 +180,30 @@ public class PhiAccrualFailureDetector {
this.maxSampleSize = maxSampleSize;
}
- public double mean() {
- return (double) intervalSum.get() / intervals.size();
- }
-
- public double variance() {
- double mean = mean();
- return ((double) squaredIntervalSum.get() / intervals.size()) - (mean * mean);
- }
-
- public double stdDeviation() {
- return Math.sqrt(variance());
- }
-
public HeartbeatHistory add(long interval) {
if (intervals.size() >= maxSampleSize) {
Long dropped = intervals.pollFirst();
- intervalSum.addAndGet(-dropped);
- squaredIntervalSum.addAndGet(-pow2(dropped));
}
intervals.add(interval);
- intervalSum.addAndGet(interval);
- squaredIntervalSum.addAndGet(pow2(interval));
return this;
}
private long pow2(long x) {
return x * x;
}
+
+ public long Stdelta(long tdelta) {
+ long result = 0;
+ for (long interval : intervals) {
+ if (interval <= tdelta) {
+ result++;
+ }
+ }
+ return result;
+ }
+
+ public long size() {
+ return intervals.size();
+ }
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/FailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/FailureDetector.java
new file mode 100644
index 0000000..028cf63
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/FailureDetector.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms.fd;
+
+import java.util.List;
+
+public interface FailureDetector {
+ double availabilityProbability(long timestampMillis);
+
+ boolean isAvailable(long timestampMillis);
+
+ long heartbeatsRecorded();
+
+ void heartbeat(long timestampMillis);
+
+ void heartbeat();
+
+ Long getLastTimestampMillis();
+
+ List<Long> getIntervalHistory();
+}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 1da7978..de10590 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -143,7 +143,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
/**
* Timestamp at which we last had contact from a member
*/
- final ConcurrentMap<InternalDistributedMember, PhiAccrualFailureDetector> memberDetectors =
+ final ConcurrentMap<InternalDistributedMember, FailureDetector> memberDetectors =
new ConcurrentHashMap<>();
/**
@@ -267,7 +267,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
logger.info(
"Failure detector has noticed a JVM pause and is giving all members a heartbeat");
for (InternalDistributedMember member : myView.getMembers()) {
- PhiAccrualFailureDetector detector = memberDetectors.get(member);
+ FailureDetector detector = memberDetectors.get(member);
if (detector != null) {
detector.heartbeat(currentTimeStamp);
}
@@ -277,8 +277,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
InternalDistributedMember neighbour = nextNeighbor;
if (neighbour != null) {
- PhiAccrualFailureDetector detector =
- getOrCreatePhiAccrualFailureDetector(neighbour);
+ FailureDetector detector =
+ getOrCreateFailureDetector(neighbour);
verifyHeartbeat(neighbour, detector);
}
@@ -288,18 +288,20 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
private void verifyHeartbeat(InternalDistributedMember member,
- PhiAccrualFailureDetector detector) {
+ FailureDetector detector) {
if (!detector.isAvailable(currentTimeStamp)) {
logger.warn("ERNIE & BRUCE: initiating checkMember from the health monitor for "
+ "{} with phi={}, last heartbeat={} heartbeats recorded={}",
- member, detector.phi(currentTimeStamp), detector.getLastTimestampMillis() == null
+ member, detector.availabilityProbability(currentTimeStamp),
+ detector.getLastTimestampMillis() == null
? "none" : new Date(detector.getLastTimestampMillis()),
detector.heartbeatsRecorded());
checkMember(member);
} else {
logger.info("ERNIE & BRUCE: verifyHeartbeat found {} is available with phi={}"
+ " last heartbeat={} heartbeats recorded={}",
- member, detector.phi(currentTimeStamp), detector.getLastTimestampMillis() == null
+ member, detector.availabilityProbability(currentTimeStamp),
+ detector.getLastTimestampMillis() == null
? "none" : new Date(detector.getLastTimestampMillis()),
detector.heartbeatsRecorded());
if (detector.getLastTimestampMillis() != null &&
@@ -426,7 +428,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
* Record member activity at a specified time
*/
private void contactedBy(InternalDistributedMember sender, long timeStamp) {
- PhiAccrualFailureDetector detector = memberDetectors.get(sender);
+ FailureDetector detector = memberDetectors.get(sender);
if (detector != null) {
detector.heartbeat(timeStamp);
List<Long> intervalHistory = detector.getIntervalHistory();
@@ -444,20 +446,30 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
- public PhiAccrualFailureDetector getOrCreatePhiAccrualFailureDetector(
+ public FailureDetector getOrCreateFailureDetector(
InternalDistributedMember member) {
- PhiAccrualFailureDetector detector = GMSHealthMonitor.this.memberDetectors.get(member);
+ FailureDetector detector = GMSHealthMonitor.this.memberDetectors.get(member);
if (detector == null) {
// it's okay to not synchronize here - if we happen to create another detector
// for this member in another thread it will be pretty equivalent to the one created here
- final int threshold = Integer.getInteger("geode.phiAccrualThreshold", 5);
- final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 200);
- final long minStdDev = Long.getLong("geode.phiAccrualMinimumStandardDeviation",
- memberTimeout / 10);
- // have tried acceptableHeartbeatPauseMillis=0, memberTimeout/2 and /4
- detector = new PhiAccrualFailureDetector(
- threshold, sampleSize, minStdDev, memberTimeout / 4, memberTimeout);
+ String detectorType = System.getProperty("gemfire.failure-detector", "phi");
+ if (detectorType.equals("phi")) {
+ final int threshold = Integer.getInteger("geode.phiAccrualThreshold", 5);
+ final int sampleSize = Integer.getInteger("geode.phiAccrualSampleSize", 200);
+ final long minStdDev = Long.getLong("geode.phiAccrualMinimumStandardDeviation",
+ memberTimeout / 10);
+ // have tried acceptableHeartbeatPauseMillis=0, memberTimeout/2 and /4
+ detector = new PhiAccrualFailureDetector(
+ threshold, sampleSize, minStdDev, memberTimeout / 4, memberTimeout);
+ } else {
+ final double threshold =
+ Double.parseDouble(System.getProperty("geode.failureDetectionThreshold", "0.99"));
+ final int sampleSize = Integer.getInteger("geode.heartbeatSampleSize", 200);
+ detector = new AdaptiveAccrualFailureDetector(
+ threshold, sampleSize, memberTimeout / 4);
+ }
+
logger.info(
"ERNIE & BRUCE: created a new failure detector for {} with lastTimestamp={} and recorded heartbeats={}",
member, detector.getLastTimestampMillis(), detector.heartbeatsRecorded());
@@ -544,8 +556,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
if (pingResp.getResponseMsg() == null) {
pingResp.wait(memberTimeout);
}
- PhiAccrualFailureDetector detector = memberDetectors.get(member);
- if (detector != null && detector.getLastTimestampMillis() > startTime) {
+ FailureDetector detector = memberDetectors.get(member);
+ if (detector != null && detector.getLastTimestampMillis() != null
+ && detector.getLastTimestampMillis() > startTime) {
return true;
}
if (pingResp.getResponseMsg() == null) {
@@ -655,7 +668,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
this.stats.incTcpFinalCheckResponsesReceived();
}
if (b == OK) {
- PhiAccrualFailureDetector detector = memberDetectors.get(suspectMember);
+ FailureDetector detector = memberDetectors.get(suspectMember);
if (detector != null) {
detector.heartbeat();
}
@@ -1186,7 +1199,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
// we got heartbeat lets update timestamp
- getOrCreatePhiAccrualFailureDetector(m.getSender());
+ getOrCreateFailureDetector(m.getSender());
contactedBy(m.getSender(), System.currentTimeMillis());
}
@@ -1391,7 +1404,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
}
if (!pinged && !isStopping) {
- PhiAccrualFailureDetector detector = memberDetectors.get(mbr);
+ FailureDetector detector = memberDetectors.get(mbr);
if (detector == null || detector.getLastTimestampMillis() == null
|| detector.getLastTimestampMillis() < startTime) {
logger.info("Availability check failed for member {}", mbr);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
index affd09e..4722f66 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
@@ -53,7 +53,7 @@ import org.apache.geode.internal.logging.LogService;
* and standard deviation estimated from historical heartbeat inter-arrival times.
*
*/
-public class PhiAccrualFailureDetector {
+public class PhiAccrualFailureDetector implements FailureDetector {
private static final Logger logger = LogService.getLogger();
private final double threshold;
@@ -131,7 +131,8 @@ public class PhiAccrualFailureDetector {
return Math.max(stdDeviationMillis, minStdDeviationMillis);
}
- public synchronized double phi(long timestampMillis) {
+ @Override
+ public synchronized double availabilityProbability(long timestampMillis) {
Long lastTimestampMillis = this.lastTimestampMillis.get();
if (lastTimestampMillis == null) {
return 0.0;
@@ -151,11 +152,12 @@ public class PhiAccrualFailureDetector {
}
public synchronized double phi() {
- return phi(System.currentTimeMillis());
+ return availabilityProbability(System.currentTimeMillis());
}
+ @Override
public boolean isAvailable(long timestampMillis) {
- double currentPhi = phi(timestampMillis);
+ double currentPhi = availabilityProbability(timestampMillis);
if (Double.isNaN(currentPhi)) {
// There isn't enough history to compute a valid Phi so we use a timestamp check
return (getLastTimestampMillis() >= timestampMillis - acceptableHeartbeatPauseMillis);
@@ -167,10 +169,12 @@ public class PhiAccrualFailureDetector {
return isAvailable(System.currentTimeMillis());
}
+ @Override
public long heartbeatsRecorded() {
return heartbeatsRecorded.get();
}
+ @Override
public synchronized void heartbeat(long timestampMillis) {
Long lastTimestampMillis = this.lastTimestampMillis.getAndSet(timestampMillis);
/** bruce s.: for Apache Geode, don't record duplicate heartbeats */
@@ -186,6 +190,7 @@ public class PhiAccrualFailureDetector {
}
}
+ @Override
public Long getLastTimestampMillis() {
return lastTimestampMillis.get();
}
@@ -198,6 +203,7 @@ public class PhiAccrualFailureDetector {
heartbeat(System.currentTimeMillis());
}
+ @Override
public List<Long> getIntervalHistory() {
return heartbeatHistory.intervals;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java b/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java
index da94f38..23d60d0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/SystemAdmin.java
@@ -1122,20 +1122,7 @@ public class SystemAdmin {
return;
}
}
- System.out.println(" " + v.toString());
- if (details) {
- System.out.print(" values=");
- double[] snapshots = v.getSnapshots();
- for (int i = 0; i < snapshots.length; i++) {
- System.out.print(' ');
- System.out.print(snapshots[i]);
- }
- System.out.println();
- String desc = v.getDescriptor().getDescription();
- if (desc != null && desc.length() > 0) {
- System.out.println(" " + desc);
- }
- }
+ v.printStats();
}
/**
@@ -1249,7 +1236,7 @@ public class SystemAdmin {
}
}
for (Map.Entry<CombinedResources, List<StatValue>> me : allSpecsMap.entrySet()) {
- System.out.println(me.getKey());
+// System.out.println(me.getKey());
for (StatValue v : me.getValue()) {
printStatValue(v, startTime, endTime, nofilter, persec, persample, prunezeros,
details);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/statistics/StatArchiveReader.java b/geode-core/src/main/java/org/apache/geode/internal/statistics/StatArchiveReader.java
index 58bdc46..5e9c3b0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/statistics/StatArchiveReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/statistics/StatArchiveReader.java
@@ -541,6 +541,8 @@ public class StatArchiveReader implements StatArchiveFormat, AutoCloseable {
* Returns a description of this statistic.
*/
StatDescriptor getDescriptor();
+
+ void printStats();
}
protected abstract static class AbstractValue implements StatValue {
@@ -565,6 +567,12 @@ public class StatArchiveReader implements StatArchiveFormat, AutoCloseable {
}
@Override
+ public void printStats() {
+ calcStats();
+ System.out.format("%s,%s,%s,%s\n", min, max, avg, stddev);
+ }
+
+ @Override
public int getSnapshotsSize() {
calcStats();
return this.size;