You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bu...@apache.org on 2021/06/28 17:48:46 UTC
[geode] branch support/1.12 updated: GEODE-9180: warn when
heartbeat thread oversleeps (#6360)
This is an automated email from the ASF dual-hosted git repository.
burcham pushed a commit to branch support/1.12
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.12 by this push:
new 0a33ffb GEODE-9180: warn when heartbeat thread oversleeps (#6360)
0a33ffb is described below
commit 0a33ffbe990af93e2779957ae6027a2c5a077c3f
Author: Bill Burcham <bi...@gmail.com>
AuthorDate: Wed Apr 28 10:22:16 2021 -0700
GEODE-9180: warn when heartbeat thread oversleeps (#6360)
* heartbeat producer logs warning when it oversleeps by a period or more
(cherry picked from commit f8b07a007ac93c323cd888cbc53dc3914336077f)
---
.../gms/fd/GMSHealthMonitorJUnitTest.java | 47 +++++
.../membership/gms/fd/GMSHealthMonitor.java | 190 ++++++++++++---------
2 files changed, 161 insertions(+), 76 deletions(-)
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
index e458dcf..2fd15e0 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitorJUnitTest.java
@@ -58,6 +58,7 @@ import java.util.Timer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.LongUnaryOperator;
import org.junit.After;
import org.junit.Assert;
@@ -989,6 +990,52 @@ public class GMSHealthMonitorJUnitTest {
executeTestDoTCPCheck(GMSHealthMonitor.ERROR + 100, false);
}
+ @Test
+ public void heartbeatOversleepCausesWarning() {
+ testHeartbeatSleepScenario(sleepLimit -> sleepLimit + 1,
+ "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: 1,000,000,001 nanoseconds. Period: 500,000,000 nanoseconds.");
+ }
+
+ @Test
+ public void heartbeatOnTimeWakeupCausesNoWarning() {
+ testHeartbeatSleepScenario(sleepLimit -> sleepLimit,
+ null);
+ }
+
+ private void testHeartbeatSleepScenario(final LongUnaryOperator actualSleepPeriod,
+ final String expectedLogWarning) {
+
+ /*
+ * Creating a class here because it's a convenient to provide (mutable) variables needed
+ * by the lambdas. Without the class, each of them would have to be arrays or atomics
+ * or some other kind of "holder object". By creating a class they can simply be fields.
+ */
+ new Runnable() {
+ // the thing we're testing
+ final GMSHealthMonitor.Heart heart = gmsHealthMonitor.new Heart();
+ int periodNumber = 0; // index into times
+ String capturedMessage; // warning message (if any) generated by heart
+
+ @Override
+ public void run() {
+ heart.sendPeriodicHeartbeats(sleepMillis -> {
+ },
+ () -> {
+ switch (periodNumber++) {
+ case 0:
+ return 0L;
+ case 1:
+ default:
+ gmsHealthMonitor.stop();
+ return actualSleepPeriod.applyAsLong(heart.sleepLimitNanos);
+ }
+ },
+ msg -> capturedMessage = msg);
+ assertThat(capturedMessage).isEqualTo(expectedLogWarning);
+ }
+ }.run();
+ }
+
private void executeTestDoTCPCheck(int receivedStatus, boolean expectedResult) throws Exception {
MemberIdentifier otherMember =
createGMSMember(Version.CURRENT_ORDINAL, 0, 1, 1);
diff --git a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
index 73af605..a1b493f 100644
--- a/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
+++ b/geode-membership/src/main/java/org/apache/geode/distributed/internal/membership/gms/fd/GMSHealthMonitor.java
@@ -749,82 +749,7 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni
* process
*/
private void startHeartbeatThread() {
- checkExecutor.execute(new Runnable() {
- @Override
- public void run() {
- Thread.currentThread().setName("Geode Heartbeat Sender");
- sendPeriodicHeartbeats();
- }
-
- private void sendPeriodicHeartbeats() {
- while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
- try {
- Thread.sleep(memberTimeout / LOGICAL_INTERVAL);
- } catch (InterruptedException e) {
- return;
- }
- GMSMembershipView<ID> v = currentView;
- if (v != null) {
- List<ID> mbrs = v.getMembers();
- int index = mbrs.indexOf(localAddress);
- if (index < 0 || mbrs.size() < 2) {
- continue;
- }
- if (!playingDead) {
- sendHeartbeats(mbrs, index);
- }
- }
- }
- }
-
- private void sendHeartbeats(List<ID> mbrs, int startIndex) {
- ID coordinator = currentView.getCoordinator();
- if (coordinator != null && !coordinator.equals(localAddress)) {
- HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
- message.setRecipient(coordinator);
- try {
- if (isStopping) {
- return;
- }
- services.getMessenger().sendUnreliably(message);
- GMSHealthMonitor.this.stats.incHeartbeatsSent();
- } catch (MembershipClosedException e) {
- return;
- }
- }
-
- int index = startIndex;
- int numSent = 0;
- for (;;) {
- index--;
- if (index < 0) {
- index = mbrs.size() - 1;
- }
- ID mbr = mbrs.get(index);
- if (mbr.equals(localAddress)) {
- break;
- }
- if (mbr.equals(coordinator)) {
- continue;
- }
- if (isStopping) {
- return;
- }
- HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
- message.setRecipient(mbr);
- try {
- services.getMessenger().sendUnreliably(message);
- GMSHealthMonitor.this.stats.incHeartbeatsSent();
- numSent++;
- if (numSent >= NUM_HEARTBEATS) {
- break;
- }
- } catch (MembershipClosedException e) {
- return;
- }
- }
- } // for (;;)
- });
+ checkExecutor.execute(new Heart());
}
@Override
@@ -1529,4 +1454,117 @@ public class GMSHealthMonitor<ID extends MemberIdentifier> implements HealthMoni
public MembershipStatistics getStats() {
return this.stats;
}
+
+ @FunctionalInterface
+ interface Sleeper {
+ void sleep(long millis) throws InterruptedException;
+ }
+
+ @FunctionalInterface
+ interface NanoTimer {
+ long nanoTime();
+ }
+
+ @FunctionalInterface
+ interface Warner {
+ void warn(String message);
+ }
+
+ class Heart implements Runnable {
+
+ // If we sleep longer than this number of periods then log a warning
+ public static final int OVERSLEEP_WARNING_THRESHOLD_PERIODS = 2;
+ public final long sleepPeriodMillis = memberTimeout / LOGICAL_INTERVAL;
+ public final long sleepPeriodNanos =
+ TimeUnit.NANOSECONDS.convert(sleepPeriodMillis, TimeUnit.MILLISECONDS);
+ public final long sleepLimitNanos = OVERSLEEP_WARNING_THRESHOLD_PERIODS * sleepPeriodNanos;
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("Geode Heartbeat Sender");
+ sendPeriodicHeartbeats(Thread::sleep, System::nanoTime, logger::warn);
+ }
+
+ @VisibleForTesting
+ void sendPeriodicHeartbeats(final Sleeper sleeper,
+ final NanoTimer nanoTimer,
+ final Warner warner) {
+ while (!isStopping && !services.getCancelCriterion().isCancelInProgress()) {
+ try {
+ final long timeBeforeSleep = nanoTimer.nanoTime();
+ sleeper.sleep(sleepPeriodMillis);
+ final long timeAfterSleep = nanoTimer.nanoTime();
+ final long asleepNanos = timeAfterSleep - timeBeforeSleep;
+ if (asleepNanos > sleepLimitNanos) {
+ warner.warn(
+ String.format(
+ "Failure detection heartbeat-generation thread overslept by more than a full period. Asleep time: %,d nanoseconds. Period: %,d nanoseconds.",
+ asleepNanos, sleepPeriodNanos));
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ GMSMembershipView<ID> v = currentView;
+ if (v != null) {
+ List<ID> mbrs = v.getMembers();
+ int index = mbrs.indexOf(localAddress);
+ if (index < 0 || mbrs.size() < 2) {
+ continue;
+ }
+ if (!playingDead) {
+ sendHeartbeats(mbrs, index);
+ }
+ }
+ }
+ }
+
+ private void sendHeartbeats(List<ID> mbrs, int startIndex) {
+ ID coordinator = currentView.getCoordinator();
+ if (coordinator != null && !coordinator.equals(localAddress)) {
+ HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
+ message.setRecipient(coordinator);
+ try {
+ if (isStopping) {
+ return;
+ }
+ services.getMessenger().sendUnreliably(message);
+ GMSHealthMonitor.this.stats.incHeartbeatsSent();
+ } catch (MembershipClosedException e) {
+ return;
+ }
+ }
+
+ int index = startIndex;
+ int numSent = 0;
+ for (;;) {
+ index--;
+ if (index < 0) {
+ index = mbrs.size() - 1;
+ }
+ ID mbr = mbrs.get(index);
+ if (mbr.equals(localAddress)) {
+ break;
+ }
+ if (mbr.equals(coordinator)) {
+ continue;
+ }
+ if (isStopping) {
+ return;
+ }
+ HeartbeatMessage<ID> message = new HeartbeatMessage<>(-1);
+ message.setRecipient(mbr);
+ try {
+ services.getMessenger().sendUnreliably(message);
+ GMSHealthMonitor.this.stats.incHeartbeatsSent();
+ numSent++;
+ if (numSent >= NUM_HEARTBEATS) {
+ break;
+ }
+ } catch (MembershipClosedException e) {
+ return;
+ }
+ }
+ } // for (;;)
+ }
}