You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2022/04/21 18:22:51 UTC

[pulsar] branch master updated: [Broker] Make health check fail if dead locked threads are detected (#15155)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new df0c1104466 [Broker] Make health check fail if dead locked threads are detected (#15155)
df0c1104466 is described below

commit df0c11044662092da0df9b9ed790bf90b6f51b3b
Author: Lari Hotari <lh...@users.noreply.github.com>
AuthorDate: Thu Apr 21 21:22:45 2022 +0300

    [Broker] Make health check fail if dead locked threads are detected (#15155)
    
    * [Broker] Make health check fail if dead locked threads are detected
    
    * Add unit test for detecting a dead lock
    
    * Use lockInterruptibly to unlock the deadlock and wait for threads to finish
    
    * Add test for testing the deadlock detection overhead
---
 .../org/apache/pulsar/tests/ThreadDumpUtil.java    |  2 +-
 .../pulsar/broker/admin/impl/BrokersBase.java      | 32 +++++++++
 .../broker/admin/AdminApiHealthCheckTest.java      | 76 +++++++++++++++++++++-
 .../apache/pulsar/common/util/ThreadDumpUtil.java  |  2 +-
 4 files changed, 108 insertions(+), 4 deletions(-)

diff --git a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java
index 7752f354456..bf0d8bf41ca 100644
--- a/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java
+++ b/buildtools/src/main/java/org/apache/pulsar/tests/ThreadDumpUtil.java
@@ -102,7 +102,7 @@ public class ThreadDumpUtil {
 
     static String buildDeadlockInfo() {
         ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-        long[] threadIds = threadBean.findMonitorDeadlockedThreads();
+        long[] threadIds = threadBean.findDeadlockedThreads();
         if (threadIds != null && threadIds.length > 0) {
             StringWriter stringWriter = new StringWriter();
             PrintWriter out = new PrintWriter(stringWriter);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 5f73bc949de..5a3db2302ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,13 +24,18 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -64,6 +69,7 @@ import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.BrokerInfo;
 import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ThreadDumpUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -73,6 +79,10 @@ import org.slf4j.LoggerFactory;
 public class BrokersBase extends AdminResource {
     private static final Logger LOG = LoggerFactory.getLogger(BrokersBase.class);
     public static final String HEALTH_CHECK_TOPIC_SUFFIX = "healthcheck";
+    // log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
+    // to prevent excessive logging
+    private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
+    private volatile long threadDumpLoggedTimestamp;
 
     @GET
     @Path("/{cluster}")
@@ -324,6 +334,7 @@ public class BrokersBase extends AdminResource {
     public void healthCheck(@Suspended AsyncResponse asyncResponse,
                             @QueryParam("topicVersion") TopicVersion topicVersion) {
         validateSuperUserAccessAsync()
+                .thenAccept(__ -> checkDeadlockedThreads())
                 .thenCompose(__ -> internalRunHealthCheck(topicVersion))
                 .thenAccept(__ -> {
                     LOG.info("[{}] Successfully run health check.", clientAppId());
@@ -335,6 +346,27 @@ public class BrokersBase extends AdminResource {
                 });
     }
 
+    private void checkDeadlockedThreads() {
+        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+        long[] threadIds = threadBean.findDeadlockedThreads();
+        if (threadIds != null && threadIds.length > 0) {
+            ThreadInfo[] threadInfos = threadBean.getThreadInfo(threadIds, false, false);
+            String threadNames = Arrays.stream(threadInfos)
+                    .map(threadInfo -> threadInfo.getThreadName() + "(tid=" + threadInfo.getThreadId() + ")").collect(
+                            Collectors.joining(", "));
+            if (System.currentTimeMillis() - threadDumpLoggedTimestamp
+                    > LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED) {
+                threadDumpLoggedTimestamp = System.currentTimeMillis();
+                LOG.error("Deadlocked threads detected. {}\n{}", threadNames,
+                        ThreadDumpUtil.buildThreadDiagnosticString());
+            } else {
+                LOG.error("Deadlocked threads detected. {}", threadNames);
+            }
+            throw new IllegalStateException("Deadlocked threads detected. " + threadNames);
+        }
+    }
+
+
     private CompletableFuture<Void> internalRunHealthCheck(TopicVersion topicVersion) {
         NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
                 ? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index b9886b20410..86e4d732c8a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -19,6 +19,13 @@
 package org.apache.pulsar.broker.admin;
 
 import com.google.common.collect.Sets;
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadMXBean;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -32,8 +39,6 @@ import org.springframework.util.CollectionUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 
 @Test(groups = "broker-admin")
 @Slf4j
@@ -93,6 +98,73 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
         );
     }
 
+    @Test(expectedExceptions= PulsarAdminException.class, expectedExceptionsMessageRegExp = ".*Deadlocked threads detected.*")
+    public void testHealthCheckupDetectsDeadlock() throws Exception {
+        // simulate a deadlock in the Test JVM
+        // the broker used in unit tests runs in the test JVM and the
+        // healthcheck implementation should detect this deadlock
+        Lock lock1 = new ReentrantReadWriteLock().writeLock();
+        Lock lock2 = new ReentrantReadWriteLock().writeLock();
+        final Phaser phaser = new Phaser(3);
+        Thread thread1=new Thread(() -> {
+            phaser.arriveAndAwaitAdvance();
+            try {
+                deadlock(lock1, lock2, 1000L);
+            } finally {
+                phaser.arriveAndDeregister();
+            }
+        }, "deadlockthread-1");
+        Thread thread2=new Thread(() -> {
+            phaser.arriveAndAwaitAdvance();
+            try {
+                deadlock(lock2, lock1, 2000L);
+            } finally {
+                phaser.arriveAndDeregister();
+            }
+        }, "deadlockthread-2");
+        thread1.start();
+        thread2.start();
+        phaser.arriveAndAwaitAdvance();
+        Thread.sleep(5000L);
+
+        try {
+            admin.brokers().healthcheck(TopicVersion.V2);
+        } finally {
+            // unlock the deadlock
+            thread1.interrupt();
+            thread2.interrupt();
+            // wait for deadlock threads to finish
+            phaser.arriveAndAwaitAdvance();
+        }
+    }
+
+    private void deadlock(Lock lock1, Lock lock2, long millis) {
+        try {
+            lock1.lockInterruptibly();
+            try {
+                Thread.sleep(millis);
+                lock2.lockInterruptibly();
+                lock2.unlock();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } finally {
+                lock1.unlock();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    @Test(timeOut = 5000L)
+    public void testDeadlockDetectionOverhead() {
+        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+        for (int i=0; i < 1000; i++) {
+            long[] threadIds = threadBean.findDeadlockedThreads();
+            // assert that there's no deadlock
+            Assert.assertNull(threadIds);
+        }
+    }
+
     @Test
     public void testHealthCheckupV1() throws Exception {
         final int times = 30;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java
index 0c903fa2f5d..683aaeffcb3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ThreadDumpUtil.java
@@ -102,7 +102,7 @@ public class ThreadDumpUtil {
 
     static String buildDeadlockInfo() {
         ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
-        long[] threadIds = threadBean.findMonitorDeadlockedThreads();
+        long[] threadIds = threadBean.findDeadlockedThreads();
         if (threadIds != null && threadIds.length > 0) {
             StringWriter stringWriter = new StringWriter();
             PrintWriter out = new PrintWriter(stringWriter);