You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/06/16 21:43:09 UTC

[geode] branch feature/backport-GEODE-9180-1-13 created (now a3036ff)

This is an automated email from the ASF dual-hosted git repository.

burcham pushed a change to branch feature/backport-GEODE-9180-1-13
in repository https://gitbox.apache.org/repos/asf/geode.git.


      at a3036ff  GEODE-9180: warn when heartbeat thread oversleeps (#6360)

This branch includes the following new commits:

     new a3036ff  GEODE-9180: warn when heartbeat thread oversleeps (#6360)

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[geode] 01/01: GEODE-9180: warn when heartbeat thread oversleeps (#6360)

Posted by bu...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

burcham pushed a commit to branch feature/backport-GEODE-9180-1-13
in repository https://gitbox.apache.org/repos/asf/geode.git

commit a3036ffb5c79a3ea6def7600efd519b8206aa16c
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Wed Apr 28 10:22:16 2021 -0700

    GEODE-9180: warn when heartbeat thread oversleeps (#6360)
    
    * heartbeat producer logs warning when it oversleeps by a period or more
    
    (cherry picked from commit f8b07a007ac93c323cd888cbc53dc3914336077f)
---
 .../gms/fd/GMSHealthMonitorJUnitTest.java          |  47 +++++
 .../membership/gms/fd/GMSHealthMonitor.java        | 190 ++++++++++++---------
 2 files changed, 161 insertions(+), 76 deletions(-)

diff --git a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index 2aaf2f5..81e132a 100644
--- a/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-membership/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -48,6 +48,7 @@ import java.util.Timer;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.LongUnaryOperator;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -952,6 +953,52 @@ public class GMSHealthMonitorJUnitTest {
     executeTestDoTCPCheck(GMSHealthMonitor.ERROR + 100, false);
   }
 
+  @Test
+  public void heartbeatOversleepCausesWarning() {
+    testHeartbeatSleepScenario(sleepLimit -> sleepLimit + 1,
+        "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: 1,000,000,001 nanoseconds. Period: 500,000,000 nanoseconds.");
+  }
+
+  @Test
+  public void heartbeatOnTimeWakeupCausesNoWarning() {
+    testHeartbeatSleepScenario(sleepLimit -> sleepLimit,
+        null);
+  }
+
+  private void testHeartbeatSleepScenario(final LongUnaryOperator actualSleepPeriod,
+      final String expectedLogWarning) {
+
+    /*
+     * Creating a class here because it's a convenient to provide (mutable) variables needed
+     * by the lambdas. Without the class, each of them would have to be arrays or atomics
+     * or some other kind of "holder object". By creating a class they can simply be fields.
+     */
+    new Runnable() {
+      // the thing we're testing
+      final GMSHealthMonitor.Heart heart = gmsHealthMonitor.new Heart();
+      int periodNumber = 0; // index into times
+      String capturedMessage; // warning message (if any) generated by heart
+
+      @Override
+      public void run() {
+        heart.sendPeriodicHeartbeats(sleepMillis -> {
+        },
+            () -> {
+              switch (periodNumber++) {
+                case 0:
+                  return 0L;
+                case 1:
+                default:
+                  gmsHealthMonitor.stop();
+                  return actualSleepPeriod.applyAsLong(heart.sleepLimitNanos);
+              }
+            },
+            msg -> capturedMessage = msg);
+        assertThat(capturedMessage).isEqualTo(expectedLogWarning);
+      }
+    }.run();
+  }
+
   private void executeTestDoTCPCheck(int receivedStatus, boolean expectedResult) throws Exception {
     MemberIdentifier otherMember =
         createGMSMember(Version.CURRENT_ORDINAL, 0, 1, 1);
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 2590e23..7457557 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -750,82 +750,7 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni
    * process
    */
   private void startHeartbeatThread() {
-    checkExecutor.execute(new Runnable() {
-      @Override
-      public void run() {
-        Thread.currentThread().setName("Geode Heartbeat Sender");
-        sendPeriodicHeartbeats();
-      }
-
-      private void sendPeriodicHeartbeats() {
-        while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
-          try {
-            Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
-          } catch (InterruptedException e) {
-            return;
-          }
-          GMSMembershipView<ID> v = currentView;
-          if (v != null) {
-            List<ID> mbrs = v.getMembers();
-            int index = mbrs.indexOf(localAddress);
-            if (index < 0 || mbrs.size() < 2) {
-              continue;
-            }
-            if (!playingDead) {
-              sendHeartbeats(mbrs, index);
-            }
-          }
-        }
-      }
-
-      private void sendHeartbeats(List<ID> mbrs, int startIndex) {
-        ID coordinator = currentView.getCoordinator();
-        if (coordinator != null && !coordinator.equals(localAddress)) {
-          HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
-          message.setRecipient(coordinator);
-          try {
-            if (isStopping) {
-              return;
-            }
-            services.getMessenger().sendUnreliably(message);
-            GMSHealthMonitor.this.stats.incHeartbeatsSent();
-          } catch (MembershipClosedException e) {
-            return;
-          }
-        }
-
-        int index = startIndex;
-        int numSent = 0;
-        for (;;) {
-          index--;
-          if (index < 0) {
-            index = mbrs.size() - 1;
-          }
-          ID mbr = mbrs.get(index);
-          if (mbr.equals(localAddress)) {
-            break;
-          }
-          if (mbr.equals(coordinator)) {
-            continue;
-          }
-          if (isStopping) {
-            return;
-          }
-          HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
-          message.setRecipient(mbr);
-          try {
-            services.getMessenger().sendUnreliably(message);
-            GMSHealthMonitor.this.stats.incHeartbeatsSent();
-            numSent++;
-            if (numSent >= NUM_HEARTBEATS) {
-              break;
-            }
-          } catch (MembershipClosedException e) {
-            return;
-          }
-        }
-      } // for (;;)
-    });
+    checkExecutor.execute(new Heart());
   }
 
   @Override
@@ -1530,4 +1455,117 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni
   public MembershipStatistics getStats() {
     return this.stats;
   }
+
+  @FunctionalInterface
+  interface Sleeper {
+    void sleep(long millis) throws InterruptedException;
+  }
+
+  @FunctionalInterface
+  interface NanoTimer {
+    long nanoTime();
+  }
+
+  @FunctionalInterface
+  interface Warner {
+    void warn(String message);
+  }
+
+  class Heart implements Runnable {
+
+    // If we sleep longer than this number of periods then log a warning
+    public static final int OVERSLEEP_WARNING_THRESHOLD_PERIODS = 2;
+    public final long sleepPeriodMillis = memberTimeout / LOGICAL_INTERVAL;
+    public final long sleepPeriodNanos =
+        TimeUnit.NANOSECONDS.convert(sleepPeriodMillis, TimeUnit.MILLISECONDS);
+    public final long sleepLimitNanos = OVERSLEEP_WARNING_THRESHOLD_PERIODS * sleepPeriodNanos;
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("Geode Heartbeat Sender");
+      sendPeriodicHeartbeats(Thread::sleep, System::nanoTime, logger::warn);
+    }
+
+    @VisibleForTesting
+    void sendPeriodicHeartbeats(final Sleeper sleeper,
+        final NanoTimer nanoTimer,
+        final Warner warner) {
+      while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+        try {
+          final long timeBeforeSleep = nanoTimer.nanoTime();
+          sleeper.sleep(sleepPeriodMillis);
+          final long timeAfterSleep = nanoTimer.nanoTime();
+          final long asleepNanos = timeAfterSleep - timeBeforeSleep;
+          if (asleepNanos > sleepLimitNanos) {
+            warner.warn(
+                String.format(
+                    "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: %,d nanoseconds. Period: %,d nanoseconds.",
+                    asleepNanos, sleepPeriodNanos));
+          }
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          return;
+        }
+        GMSMembershipView<ID> v = currentView;
+        if (v != null) {
+          List<ID> mbrs = v.getMembers();
+          int index = mbrs.indexOf(localAddress);
+          if (index < 0 || mbrs.size() < 2) {
+            continue;
+          }
+          if (!playingDead) {
+            sendHeartbeats(mbrs, index);
+          }
+        }
+      }
+    }
+
+    private void sendHeartbeats(List<ID> mbrs, int startIndex) {
+      ID coordinator = currentView.getCoordinator();
+      if (coordinator != null && !coordinator.equals(localAddress)) {
+        HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
+        message.setRecipient(coordinator);
+        try {
+          if (isStopping) {
+            return;
+          }
+          services.getMessenger().sendUnreliably(message);
+          GMSHealthMonitor.this.stats.incHeartbeatsSent();
+        } catch (MembershipClosedException e) {
+          return;
+        }
+      }
+
+      int index = startIndex;
+      int numSent = 0;
+      for (;;) {
+        index--;
+        if (index < 0) {
+          index = mbrs.size() - 1;
+        }
+        ID mbr = mbrs.get(index);
+        if (mbr.equals(localAddress)) {
+          break;
+        }
+        if (mbr.equals(coordinator)) {
+          continue;
+        }
+        if (isStopping) {
+          return;
+        }
+        HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
+        message.setRecipient(mbr);
+        try {
+          services.getMessenger().sendUnreliably(message);
+          GMSHealthMonitor.this.stats.incHeartbeatsSent();
+          numSent++;
+          if (numSent >= NUM_HEARTBEATS) {
+            break;
+          }
+        } catch (MembershipClosedException e) {
+          return;
+        }
+      }
+    } // for (;;)
+  }
 }