You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/06/28 17:48:46 UTC

[geode] branch support/1.12 updated: GEODE-9180: warn when heartbeat thread oversleeps (#6360)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.12 by this push:
     new 0a33ffb  GEODE-9180: warn when heartbeat thread oversleeps (#6360)
0a33ffb is described below

commit 0a33ffbe990af93e2779957ae6027a2c5a077c3f
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Wed Apr 28 10:22:16 2021 -0700

    GEODE-9180: warn when heartbeat thread oversleeps (#6360)
    
    * heartbeat producer logs warning when it oversleeps by a period or more
    
    (cherry picked from commit f8b07a007ac93c323cd888cbc53dc3914336077f)
---
 .../gms/fd/GMSHealthMonitorJUnitTest.java          |  47 +++++
 .../membership/gms/fd/GMSHealthMonitor.java        | 190 ++++++++++++---------
 2 files changed, 161 insertions(+), 76 deletions(-)

diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index e458dcf..2fd15e0 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -58,6 +58,7 @@ import java.util.Timer;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.LongUnaryOperator;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -989,6 +990,52 @@ public class GMSHealthMonitorJUnitTest {
     executeTestDoTCPCheck(GMSHealthMonitor.ERROR + 100, false);
   }
 
+  @Test
+  public void heartbeatOversleepCausesWarning() {
+    testHeartbeatSleepScenario(sleepLimit -> sleepLimit + 1,
+        "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: 1,000,000,001 nanoseconds. Period: 500,000,000 nanoseconds.");
+  }
+
+  @Test
+  public void heartbeatOnTimeWakeupCausesNoWarning() {
+    testHeartbeatSleepScenario(sleepLimit -> sleepLimit,
+        null);
+  }
+
+  private void testHeartbeatSleepScenario(final LongUnaryOperator actualSleepPeriod,
+      final String expectedLogWarning) {
+
+    /*
+     * Creating a class here because it's a convenient to provide (mutable) variables needed
+     * by the lambdas. Without the class, each of them would have to be arrays or atomics
+     * or some other kind of "holder object". By creating a class they can simply be fields.
+     */
+    new Runnable() {
+      // the thing we're testing
+      final GMSHealthMonitor.Heart heart = gmsHealthMonitor.new Heart();
+      int periodNumber = 0; // index into times
+      String capturedMessage; // warning message (if any) generated by heart
+
+      @Override
+      public void run() {
+        heart.sendPeriodicHeartbeats(sleepMillis -> {
+        },
+            () -> {
+              switch (periodNumber++) {
+                case 0:
+                  return 0L;
+                case 1:
+                default:
+                  gmsHealthMonitor.stop();
+                  return actualSleepPeriod.applyAsLong(heart.sleepLimitNanos);
+              }
+            },
+            msg -> capturedMessage = msg);
+        assertThat(capturedMessage).isEqualTo(expectedLogWarning);
+      }
+    }.run();
+  }
+
   private void executeTestDoTCPCheck(int receivedStatus, boolean expectedResult) throws Exception {
     MemberIdentifier otherMember =
         createGMSMember(Version.CURRENT_ORDINAL, 0, 1, 1);
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 73af605..a1b493f 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -749,82 +749,7 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni
    * process
    */
   private void startHeartbeatThread() {
-    checkExecutor.execute(new Runnable() {
-      @Override
-      public void run() {
-        Thread.currentThread().setName("Geode Heartbeat Sender");
-        sendPeriodicHeartbeats();
-      }
-
-      private void sendPeriodicHeartbeats() {
-        while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-          try {
-            Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
-          } catch (InterruptedException e) {
-            return;
-          }
-          GMSMembershipView<ID> v = currentView;
-          if (v != null) {
-            List<ID> mbrs = v.getMembers();
-            int index = mbrs.indexOf(localAddress);
-            if (index < 0 || mbrs.size() < 2) {
-              continue;
-            }
-            if (!playingDead) {
-              sendHeartbeats(mbrs, index);
-            }
-          }
-        }
-      }
-
-      private void sendHeartbeats(List<ID> mbrs, int startIndex) {
-        ID coordinator = currentView.getCoordinator();
-        if (coordinator != null && !coordinator.equals(localAddress)) {
-          HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
-          message.setRecipient(coordinator);
-          try {
-            if (isStopping) {
-              return;
-            }
-            services.getMessenger().sendUnreliably(message);
-            GMSHealthMonitor.this.stats.incHeartbeatsSent();
-          } catch (MembershipClosedException e) {
-            return;
-          }
-        }
-
-        int index = startIndex;
-        int numSent = 0;
-        for (;;) {
-          index--;
-          if (index < 0) {
-            index = mbrs.size() - 1;
-          }
-          ID mbr = mbrs.get(index);
-          if (mbr.equals(localAddress)) {
-            break;
-          }
-          if (mbr.equals(coordinator)) {
-            continue;
-          }
-          if (isStopping) {
-            return;
-          }
-          HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
-          message.setRecipient(mbr);
-          try {
-            services.getMessenger().sendUnreliably(message);
-            GMSHealthMonitor.this.stats.incHeartbeatsSent();
-            numSent++;
-            if (numSent >= NUM_HEARTBEATS) {
-              break;
-            }
-          } catch (MembershipClosedException e) {
-            return;
-          }
-        }
-      } // for (;;)
-    });
+    checkExecutor.execute(new Heart());
   }
 
   @Override
@@ -1529,4 +1454,117 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni
   public MembershipStatistics getStats() {
     return this.stats;
   }
+
+  @FunctionalInterface
+  interface Sleeper {
+    void sleep(long millis) throws InterruptedException;
+  }
+
+  @FunctionalInterface
+  interface NanoTimer {
+    long nanoTime();
+  }
+
+  @FunctionalInterface
+  interface Warner {
+    void warn(String message);
+  }
+
+  class Heart implements Runnable {
+
+    // If we sleep longer than this number of periods then log a warning
+    public static final int OVERSLEEP_WARNING_THRESHOLD_PERIODS = 2;
+    public final long sleepPeriodMillis = memberTimeout / LOGICAL_INTERVAL;
+    public final long sleepPeriodNanos =
+        TimeUnit.NANOSECONDS.convert(sleepPeriodMillis, TimeUnit.MILLISECONDS);
+    public final long sleepLimitNanos = OVERSLEEP_WARNING_THRESHOLD_PERIODS * sleepPeriodNanos;
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("Geode Heartbeat Sender");
+      sendPeriodicHeartbeats(Thread::sleep, System::nanoTime, logger::warn);
+    }
+
+    @VisibleForTesting
+    void sendPeriodicHeartbeats(final Sleeper sleeper,
+        final NanoTimer nanoTimer,
+        final Warner warner) {
+      while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+        try {
+          final long timeBeforeSleep = nanoTimer.nanoTime();
+          sleeper.sleep(sleepPeriodMillis);
+          final long timeAfterSleep = nanoTimer.nanoTime();
+          final long asleepNanos = timeAfterSleep - timeBeforeSleep;
+          if (asleepNanos > sleepLimitNanos) {
+            warner.warn(
+                String.format(
+                    "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: %,d nanoseconds. Period: %,d nanoseconds.",
+                    asleepNanos, sleepPeriodNanos));
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+        GMSMembershipView<ID> v = currentView;
+        if (v != null) {
+          List<ID> mbrs = v.getMembers();
+          int index = mbrs.indexOf(localAddress);
+          if (index < 0 || mbrs.size() < 2) {
+            continue;
+          }
+          if (!playingDead) {
+            sendHeartbeats(mbrs, index);
+          }
+        }
+      }
+    }
+
+    private void sendHeartbeats(List<ID> mbrs, int startIndex) {
+      ID coordinator = currentView.getCoordinator();
+      if (coordinator != null && !coordinator.equals(localAddress)) {
+        HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
+        message.setRecipient(coordinator);
+        try {
+          if (isStopping) {
+            return;
+          }
+          services.getMessenger().sendUnreliably(message);
+          GMSHealthMonitor.this.stats.incHeartbeatsSent();
+        } catch (MembershipClosedException e) {
+          return;
+        }
+      }
+
+      int index = startIndex;
+      int numSent = 0;
+      for (;;) {
+        index--;
+        if (index < 0) {
+          index = mbrs.size() - 1;
+        }
+        ID mbr = mbrs.get(index);
+        if (mbr.equals(localAddress)) {
+          break;
+        }
+        if (mbr.equals(coordinator)) {
+          continue;
+        }
+        if (isStopping) {
+          return;
+        }
+        HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
+        message.setRecipient(mbr);
+        try {
+          services.getMessenger().sendUnreliably(message);
+          GMSHealthMonitor.this.stats.incHeartbeatsSent();
+          numSent++;
+          if (numSent >= NUM_HEARTBEATS) {
+            break;
+          }
+        } catch (MembershipClosedException e) {
+          return;
+        }
+      }
+    } // for (;;)
+  }
 }