You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/21 18:22:51 UTC
[pulsar] branch master updated: [Broker] Make health check fail if dead locked threads are detected (#15155)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new df0c1104466 [Broker] Make health check fail if dead locked threads are detected (#15155)
df0c1104466 is described below
commit df0c11044662092da0df9b9ed790bf90b6f51b3b
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Apr 21 21:22:45 2022 +0300
[Broker] Make health check fail if dead locked threads are detected (#15155)
* [Broker] Make health check fail if dead locked threads are detected
* Add unit test for detecting a dead lock
* Use lockInterruptibly to unlock the deadlock and wait for threads to finish
* Add test for testing the deadlock detection overhead
---
.../org/apache/pulsar/tests/ThreadDumpUtil.java | 2 +-
.../pulsar/broker/admin/impl/BrokersBase.java | 32 +++++++++
.../broker/admin/AdminApiHealthCheckTest.java | 76 +++++++++++++++++++++-
.../apache/pulsar/common/util/ThreadDumpUtil.java | 2 +-
4 files changed, 108 insertions(+), 4 deletions(-)
diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java
index 7752f354456..bf0d8bf41ca 100644
--- a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java
+++ b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java
@@ -102,7 +102,7 @@ public class ThreadDumpUtil {
static String buildDeadlockInfo() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- long[] threadIds = threadBean.findMonitorDeadlockedThreads();
+ long[] threadIds = threadBean.findDeadlockedThreads();
if (threadIds != null && threadIds.length > 0) {
StringWriter stringWriter = new StringWriter();
PrintWriter out = new PrintWriter(stringWriter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5f73bc949de..5a3db2302ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,13 +24,18 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -64,6 +69,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.BrokerInfo;
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ThreadDumpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +79,10 @@ import org.slf4j.LoggerFactory;
public class BrokersBase extends AdminResource {
private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
+ // log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
+ // to prevent excessive logging
+ private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
+ private volatile long threadDumpLoggedTimestamp;
@GET
@Path("/{cluster}")
@@ -324,6 +334,7 @@ public class BrokersBase extends AdminResource {
public void healthCheck(@Suspended AsyncResponse asyncResponse,
@QueryParam("topicVersion") TopicVersion topicVersion) {
validateSuperUserAccessAsync()
+ .thenAccept(__ -> checkDeadlockedThreads())
.thenCompose(__ -> internalRunHealthCheck(topicVersion))
.thenAccept(__ -> {
LOG.info("[{}] Successfully run health check.", clientAppId());
@@ -335,6 +346,27 @@ public class BrokersBase extends AdminResource {
});
}
+ private void checkDeadlockedThreads() {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ long[] threadIds = threadBean.findDeadlockedThreads();
+ if (threadIds != null && threadIds.length > 0) {
+ ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, false);
+ String threadNames = Arrays.stream(threadInfos)
+ .map(threadInfo -> threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")").collect(
+ Collectors.joining(", "));
+ if (System.currentTimeMillis() - threadDumpLoggedTimestamp
+ > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) {
+ threadDumpLoggedTimestamp = System.currentTimeMillis();
+ LOG.error("Deadlocked threads detected. {}\n{}", threadNames,
+ ThreadDumpUtil.buildThreadDiagnosticString());
+ } else {
+ LOG.error("Deadlocked threads detected. {}", threadNames);
+ }
+ throw new IllegalStateException("Deadlocked threads detected. " + threadNames);
+ }
+ }
+
+
private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index b9886b20410..86e4d732c8a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -19,6 +19,13 @@
package org.apache.pulsar.broker.admin;
import com.google.common.collect.Sets;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -32,8 +39,6 @@ import org.springframework.util.CollectionUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
@Test(groups = "broker-admin")
@Slf4j
@@ -93,6 +98,73 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
);
}
+ @Test(expectedExceptions= PulsarAdminException.class, expectedExceptionsMessageRegExp = ".*Deadlocked threads detected.*")
+ public void testHealthCheckupDetectsDeadlock() throws Exception {
+ // simulate a deadlock in the Test JVM
+ // the broker used in unit tests runs in the test JVM and the
+ // healthcheck implementation should detect this deadlock
+ Lock lock1 = new ReentrantReadWriteLock().writeLock();
+ Lock lock2 = new ReentrantReadWriteLock().writeLock();
+ final Phaser phaser = new Phaser(3);
+ Thread thread1=new Thread(() -> {
+ phaser.arriveAndAwaitAdvance();
+ try {
+ deadlock(lock1, lock2, 1000L);
+ } finally {
+ phaser.arriveAndDeregister();
+ }
+ }, "deadlockthread-1");
+ Thread thread2=new Thread(() -> {
+ phaser.arriveAndAwaitAdvance();
+ try {
+ deadlock(lock2, lock1, 2000L);
+ } finally {
+ phaser.arriveAndDeregister();
+ }
+ }, "deadlockthread-2");
+ thread1.start();
+ thread2.start();
+ phaser.arriveAndAwaitAdvance();
+ Thread.sleep(5000L);
+
+ try {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ } finally {
+ // unlock the deadlock
+ thread1.interrupt();
+ thread2.interrupt();
+ // wait for deadlock threads to finish
+ phaser.arriveAndAwaitAdvance();
+ }
+ }
+
+ private void deadlock(Lock lock1, Lock lock2, long millis) {
+ try {
+ lock1.lockInterruptibly();
+ try {
+ Thread.sleep(millis);
+ lock2.lockInterruptibly();
+ lock2.unlock();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } finally {
+ lock1.unlock();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ @Test(timeOut = 5000L)
+ public void testDeadlockDetectionOverhead() {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ for (int i=0; i < 1000; i++) {
+ long[] threadIds = threadBean.findDeadlockedThreads();
+ // assert that there's no deadlock
+ Assert.assertNull(threadIds);
+ }
+ }
+
@Test
public void testHealthCheckupV1() throws Exception {
final int times = 30;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java
index 0c903fa2f5d..683aaeffcb3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java
@@ -102,7 +102,7 @@ public class ThreadDumpUtil {
static String buildDeadlockInfo() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- long[] threadIds = threadBean.findMonitorDeadlockedThreads();
+ long[] threadIds = threadBean.findDeadlockedThreads();
if (threadIds != null && threadIds.length > 0) {
StringWriter stringWriter = new StringWriter();
PrintWriter out = new PrintWriter(stringWriter);