You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2019/04/01 19:15:03 UTC

[geode] 01/01: GEODE-6583 Integrate phi-accrual failure detection into Geode

This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-6583
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 5a03277f1273fd266da7fddcb6406b0a655b8361
Author: Bruce Schuchardt <bs...@pivotal.io>
AuthorDate: Mon Apr 1 12:11:00 2019 -0700

    GEODE-6583 Integrate phi-accrual failure detection into Geode
    
    *Not ready for review*
    
    This experimental code integrates a history-based heaertbeat failure
    detector with the existing Membership failure detector.  Final checks
    remain the same - pretty much everything is the same except I've
    replaced the single heartbeat timestamp per member with a
    PhiAccrualFailureDetector object.  GMSHealthMonitorJUnitTest is passing
    as are a handfull of other distributed and integration tests.
---
 .../gms/fd/GMSHealthMonitorJUnitTest.java          |   2 +-
 .../membership/gms/fd/GMSHealthMonitor.java        |  70 +++---
 .../gms/fd/PhiAccrualFailureDetector.java          | 266 +++++++++++++++++++++
 3 files changed, 307 insertions(+), 31 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index d80b424..524d785 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -775,7 +775,7 @@ public class GMSHealthMonitorJUnitTest {
     HeartbeatMessage hb = new HeartbeatMessage(-1);
     hb.setSender(mockMembers.get(0));
     gmsHealthMonitor.processMessage(hb);
-    assertTrue(gmsHealthMonitor.memberTimeStamps.get(hb.getSender()) == null);
+    assertTrue(gmsHealthMonitor.memberDetectors.get(hb.getSender()) == null);
 
     // a sick member will not take action on a Suspect message from another member
     SuspectMembersMessage smm = mock(SuspectMembersMessage.class);
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index d1d19e7..41e41e5 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -141,7 +141,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   /**
    * Timestamp at which we last had contact from a member
    */
-  final ConcurrentMap<InternalDistributedMember, TimeStamp> memberTimeStamps =
+  final ConcurrentMap<InternalDistributedMember, PhiAccrualFailureDetector> memberDetectors =
       new ConcurrentHashMap<>();
 
   /**
@@ -247,26 +247,16 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       GMSHealthMonitor.this.currentTimeStamp = currentTime;
 
       if (neighbour != null) {
-        TimeStamp nextNeighborTS;
-        synchronized (GMSHealthMonitor.this) {
-          nextNeighborTS = GMSHealthMonitor.this.memberTimeStamps.get(neighbour);
-        }
-
-        if (nextNeighborTS == null) {
-          TimeStamp customTS = new TimeStamp(currentTime);
-          memberTimeStamps.put(neighbour, customTS);
-          return;
-        }
-
-        long interval = memberTimeoutInMillis / GMSHealthMonitor.LOGICAL_INTERVAL;
-        long lastTS = currentTime - nextNeighborTS.getTime();
-        if (lastTS + interval >= memberTimeoutInMillis) {
+        PhiAccrualFailureDetector nextNeighborDetector =
+            getOrCreatePhiAccrualFailureDetector(neighbour, currentTime);
+        if (!nextNeighborDetector.isAvailable()) {
           logger.debug("Checking member {} ", neighbour);
           // now do check request for this member;
           checkMember(neighbour);
         }
       }
     }
+
   }
 
   /***
@@ -382,11 +372,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
    * Record member activity at a specified time
    */
   private void contactedBy(InternalDistributedMember sender, long timeStamp) {
-    TimeStamp cTS = new TimeStamp(timeStamp);
-    cTS = memberTimeStamps.putIfAbsent(sender, cTS);
-    if (cTS != null && cTS.getTime() < timeStamp) {
-      cTS.setTime(timeStamp);
-    }
+
+    PhiAccrualFailureDetector detector = getOrCreatePhiAccrualFailureDetector(sender, timeStamp);
+    detector.heartbeat(timeStamp);
     if (suspectedMemberIds.containsKey(sender)) {
       memberUnsuspected(sender);
       setNextNeighbor(currentView, null);
@@ -394,6 +382,28 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
   }
 
 
+  public PhiAccrualFailureDetector getOrCreatePhiAccrualFailureDetector(
+      InternalDistributedMember member,
+      long timeStamp) {
+    PhiAccrualFailureDetector detector;
+    // TODO use something cheaper than a sync on the health monitor
+    synchronized (GMSHealthMonitor.this) {
+      detector = GMSHealthMonitor.this.memberDetectors.get(member);
+
+      if (detector == null) {
+        logger.info("creating new failure detector for {}", member);
+        final double threshold = 10;
+        final int sampleSize = 200;
+        final int minStdDev = 100;
+        detector = new PhiAccrualFailureDetector(
+            threshold, sampleSize, minStdDev, memberTimeout, memberTimeout);
+        detector.heartbeat(timeStamp);
+        memberDetectors.put(member, detector);
+      }
+    }
+    return detector;
+  }
+
   private HeartbeatRequestMessage constructHeartbeatRequestMessage(
       final InternalDistributedMember mbr) {
     final int reqId = requestId.getAndIncrement();
@@ -472,8 +482,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           if (pingResp.getResponseMsg() == null) {
             pingResp.wait(memberTimeout);
           }
-          TimeStamp ts = memberTimeStamps.get(member);
-          if (ts != null && ts.getTime() > startTime) {
+          PhiAccrualFailureDetector detector = memberDetectors.get(member);
+          if (detector != null && detector.isAvailable()) {
             return true;
           }
           if (pingResp.getResponseMsg() == null) {
@@ -485,8 +495,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           } else {
             logger.trace("received heartbeat from {}", member);
             this.stats.incHeartbeatsReceived();
-            if (ts != null) {
-              ts.setTime(System.currentTimeMillis());
+            if (detector != null) {
+              detector.heartbeat();
             }
             return true;
           }
@@ -581,9 +591,9 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
           this.stats.incTcpFinalCheckResponsesReceived();
         }
         if (b == OK) {
-          TimeStamp ts = memberTimeStamps.get(suspectMember);
-          if (ts != null) {
-            ts.setTime(System.currentTimeMillis());
+          PhiAccrualFailureDetector detector = memberDetectors.get(suspectMember);
+          if (detector != null) {
+            detector.heartbeat();
           }
           return true;
         } else {
@@ -787,7 +797,7 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
     synchronized (suspectRequestsInView) {
       suspectRequestsInView.clear();
     }
-    for (Iterator<InternalDistributedMember> it = memberTimeStamps.keySet().iterator(); it
+    for (Iterator<InternalDistributedMember> it = memberDetectors.keySet().iterator(); it
         .hasNext();) {
       if (!newView.contains(it.next())) {
         it.remove();
@@ -1312,8 +1322,8 @@ public class GMSHealthMonitor implements HealthMonitor, MessageHandler {
       }
 
       if (!pinged && !isStopping) {
-        TimeStamp ts = memberTimeStamps.get(mbr);
-        if (ts == null || ts.getTime() < startTime) {
+        PhiAccrualFailureDetector detector = memberDetectors.get(mbr);
+        if (detector == null || !detector.isAvailable()) {
           logger.info("Availability check failed for member {}", mbr);
           // if the final check fails & this VM is the coordinator we don't need to do another final
           // check
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
new file mode 100644
index 0000000..760fbdc
--- /dev/null
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/PhiAccrualFailureDetector.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal.membership.gms.fd;
+
+/*
+ * Copyright 2018 Mitsunori Komatsu (komamitsu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * This is a port of
+ * https://github.com/akka/akka/blob/master/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala
+ *
+ * Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their
+ * paper:
+ * [http://ddg.jaist.ac.jp/pub/HDY+04.pdf]
+ *
+ * The suspicion level of failure is given by a value called φ (phi).
+ * The basic idea of the φ failure detector is to express the value of φ on a scale that
+ * is dynamically adjusted to reflect current network conditions. A configurable
+ * threshold is used to decide if φ is considered to be a failure.
+ *
+ * The value of φ is calculated as:
+ *
+ * {{{
+ * φ = -log10(1 - F(timeSinceLastHeartbeat)
+ * }}}
+ * where F is the cumulative distribution function of a normal distribution with mean
+ * and standard deviation estimated from historical heartbeat inter-arrival times.
+ */
+public class PhiAccrualFailureDetector {
+  private static final Logger logger = LogService.getLogger();
+  private final double threshold;
+  private final double minStdDeviationMillis;
+  private final long acceptableHeartbeatPauseMillis;
+
+  private final HeartbeatHistory heartbeatHistory;
+  private final AtomicReference<Long> lastTimestampMillis = new AtomicReference<Long>();
+
+  /**
+   * @param threshold A low threshold is prone to generate many wrong suspicions but ensures a quick
+   *        detection in the event
+   *        of a real crash. Conversely, a high threshold generates fewer mistakes but needs more
+   *        time to detect
+   *        actual crashes
+   *
+   * @param maxSampleSize Number of samples to use for calculation of mean and standard deviation of
+   *        inter-arrival times.
+   *
+   * @param minStdDeviationMillis Minimum standard deviation to use for the normal distribution used
+   *        when calculating phi.
+   *        Too low standard deviation might result in too much sensitivity for sudden, but normal,
+   *        deviations
+   *        in heartbeat inter arrival times.
+   *
+   * @param acceptableHeartbeatPauseMillis Duration corresponding to number of potentially
+   *        lost/delayed
+   *        heartbeats that will be accepted before considering it to be an anomaly.
+   *        This margin is important to be able to survive sudden, occasional, pauses in heartbeat
+   *        arrivals, due to for example garbage collect or network drop.
+   *
+   * @param firstHeartbeatEstimateMillis Bootstrap the stats with heartbeats that corresponds to
+   *        to this duration, with a with rather high standard deviation (since environment is
+   *        unknown
+   *        in the beginning)
+   *
+   *        <p>
+   *        Note for Geode use: this class was originally called PhiAccuralFailureDetector and is
+   *        from
+   *        https://github.com/komamitsu/phi-accural-failure-detector
+   *        </p>
+   */
+  protected PhiAccrualFailureDetector(double threshold, int maxSampleSize,
+      double minStdDeviationMillis,
+      long acceptableHeartbeatPauseMillis, long firstHeartbeatEstimateMillis) {
+    if (threshold <= 0) {
+      throw new IllegalArgumentException("Threshold must be > 0: " + threshold);
+    }
+    if (maxSampleSize <= 0) {
+      throw new IllegalArgumentException("Sample size must be > 0: " + maxSampleSize);
+    }
+    if (minStdDeviationMillis <= 0) {
+      throw new IllegalArgumentException(
+          "Minimum standard deviation must be > 0: " + minStdDeviationMillis);
+    }
+    if (acceptableHeartbeatPauseMillis < 0) {
+      throw new IllegalArgumentException(
+          "Acceptable heartbeat pause millis must be >= 0: " + acceptableHeartbeatPauseMillis);
+    }
+    if (firstHeartbeatEstimateMillis <= 0) {
+      throw new IllegalArgumentException(
+          "First heartbeat value must be > 0: " + firstHeartbeatEstimateMillis);
+    }
+
+    this.threshold = threshold;
+    this.minStdDeviationMillis = minStdDeviationMillis;
+    this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
+
+    long stdDeviationMillis = firstHeartbeatEstimateMillis / 4;
+    heartbeatHistory = new HeartbeatHistory(maxSampleSize);
+    heartbeatHistory.add(firstHeartbeatEstimateMillis - stdDeviationMillis)
+        .add(firstHeartbeatEstimateMillis + stdDeviationMillis);
+  }
+
+  private double ensureValidStdDeviation(double stdDeviationMillis) {
+    return Math.max(stdDeviationMillis, minStdDeviationMillis);
+  }
+
+  public synchronized double phi(long timestampMillis) {
+    Long lastTimestampMillis = this.lastTimestampMillis.get();
+    if (lastTimestampMillis == null) {
+      return 0.0;
+    }
+
+    long timeDiffMillis = timestampMillis - lastTimestampMillis;
+    double meanMillis = heartbeatHistory.mean() + acceptableHeartbeatPauseMillis;
+    double stdDeviationMillis = ensureValidStdDeviation(heartbeatHistory.stdDeviation());
+
+    double y = (timeDiffMillis - meanMillis) / stdDeviationMillis;
+    double e = Math.exp(-y * (1.5976 + 0.070566 * y * y));
+    if (timeDiffMillis > meanMillis) {
+      return -Math.log10(e / (1.0 + e));
+    } else {
+      return -Math.log10(1.0 - 1.0 / (1.0 + e));
+    }
+  }
+
+  public synchronized double phi() {
+    return phi(System.currentTimeMillis());
+  }
+
+  public boolean isAvailable(long timestampMillis) {
+    double currentPhi = phi(timestampMillis);
+    return currentPhi < threshold;
+  }
+
+  public boolean isAvailable() {
+    double currentPhi = phi(System.currentTimeMillis());
+    return currentPhi < threshold;
+  }
+
+  public synchronized void heartbeat(long timestampMillis) {
+    Long lastTimestampMillis = this.lastTimestampMillis.getAndSet(timestampMillis);
+    if (lastTimestampMillis != null) {
+      long interval = timestampMillis - lastTimestampMillis;
+      if (isAvailable(timestampMillis)) {
+        heartbeatHistory.add(interval);
+      }
+    }
+  }
+
+  public void heartbeat() {
+    heartbeat(System.currentTimeMillis());
+  }
+
+  public static class Builder {
+    private double threshold = 16.0;
+    private int maxSampleSize = 200;
+    private double minStdDeviationMillis = 500;
+    private long acceptableHeartbeatPauseMillis = 0;
+    private long firstHeartbeatEstimateMillis = 500;
+
+    public Builder setThreshold(double threshold) {
+      this.threshold = threshold;
+      return this;
+    }
+
+    public Builder setMaxSampleSize(int maxSampleSize) {
+      this.maxSampleSize = maxSampleSize;
+      return this;
+    }
+
+    public Builder setMinStdDeviationMillis(double minStdDeviationMillis) {
+      this.minStdDeviationMillis = minStdDeviationMillis;
+      return this;
+    }
+
+    public Builder setAcceptableHeartbeatPauseMillis(long acceptableHeartbeatPauseMillis) {
+      this.acceptableHeartbeatPauseMillis = acceptableHeartbeatPauseMillis;
+      return this;
+    }
+
+    public Builder setFirstHeartbeatEstimateMillis(long firstHeartbeatEstimateMillis) {
+      this.firstHeartbeatEstimateMillis = firstHeartbeatEstimateMillis;
+      return this;
+    }
+
+    public PhiAccrualFailureDetector build() {
+      return new PhiAccrualFailureDetector(threshold, maxSampleSize, minStdDeviationMillis,
+          acceptableHeartbeatPauseMillis, firstHeartbeatEstimateMillis);
+    }
+  }
+
+  private static class HeartbeatHistory {
+    private final int maxSampleSize;
+    private final LinkedList<Long> intervals = new LinkedList<Long>();
+    private final AtomicLong intervalSum = new AtomicLong();
+    private final AtomicLong squaredIntervalSum = new AtomicLong();
+
+    public HeartbeatHistory(int maxSampleSize) {
+      if (maxSampleSize < 1) {
+        throw new IllegalArgumentException("maxSampleSize must be >= 1, got " + maxSampleSize);
+      }
+      this.maxSampleSize = maxSampleSize;
+    }
+
+    public double mean() {
+      return (double) intervalSum.get() / intervals.size();
+    }
+
+    public double variance() {
+      return ((double) squaredIntervalSum.get() / intervals.size()) - (mean() * mean());
+    }
+
+    public double stdDeviation() {
+      return Math.sqrt(variance());
+    }
+
+    public HeartbeatHistory add(long interval) {
+      if (intervals.size() >= maxSampleSize) {
+        Long dropped = intervals.pollFirst();
+        intervalSum.addAndGet(-dropped);
+        squaredIntervalSum.addAndGet(-pow2(dropped));
+      }
+      intervals.add(interval);
+      intervalSum.addAndGet(interval);
+      squaredIntervalSum.addAndGet(pow2(interval));
+      return this;
+    }
+
+    private long pow2(long x) {
+      return x * x;
+    }
+  }
+}