You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/30 15:40:16 UTC

(pulsar) branch master updated: [fix] [broker] add timeout for health check read. (#21990)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 925bf2e375b [fix] [broker] add timeout for health check read. (#21990)
925bf2e375b is described below

commit 925bf2e375b5e18bcd4d93fbed16c96cb72f1e52
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Tue Jan 30 23:40:09 2024 +0800

    [fix] [broker] add timeout for health check read. (#21990)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 13 ++++-
 .../broker/admin/AdminApiHealthCheckTest.java      | 63 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index f056b18f3f1..61b354610ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -34,6 +35,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -80,6 +82,12 @@ public class BrokersBase extends AdminResource {
     // log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
     // to prevent excessive logging
     private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
+    // there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout
+    // a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception.
+    // or we can't propagate the server timeout exception to the client.
+    private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58);
+    private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
+            FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)");
     private volatile long threadDumpLoggedTimestamp;
 
     @GET
@@ -434,7 +442,10 @@ public class BrokersBase extends AdminResource {
                                     });
                                     throw FutureUtil.wrapToCompletionException(createException);
                                 }).thenCompose(reader -> producer.sendAsync(messageStr)
-                                        .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
+                                        .thenCompose(__ -> FutureUtil.addTimeoutHandling(
+                                                healthCheckRecursiveReadNext(reader, messageStr),
+                                                HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
+                                                () -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
                                         .whenComplete((__, ex) -> {
                                             closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
                                                     .whenComplete((unused, innerEx) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index a780f889de8..357422b11f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -31,13 +32,21 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.compaction.Compactor;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.springframework.util.CollectionUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -236,4 +245,58 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
                 ))
         );
     }
+
+    class DummyProducerBuilder<T> extends ProducerBuilderImpl<T> {
+        // This is a dummy producer builder to test the health check timeout
+        // the producer constructed by this builder will not send any message
+        public DummyProducerBuilder(PulsarClientImpl client, Schema schema) {
+            super(client, schema);
+        }
+
+        @Override
+        public CompletableFuture<Producer<T>> createAsync() {
+            CompletableFuture<Producer<T>> future = new CompletableFuture<>();
+            super.createAsync().thenAccept(producer -> {
+                Producer<T> spyProducer = Mockito.spy(producer);
+                Mockito.doReturn(CompletableFuture.completedFuture(MessageId.earliest))
+                        .when(spyProducer).sendAsync(Mockito.any());
+                future.complete(spyProducer);
+            }).exceptionally(ex -> {
+                future.completeExceptionally(ex);
+                return null;
+            });
+            return future;
+        }
+    }
+
+    @Test
+    public void testHealthCheckTimeOut() throws Exception {
+        final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        PulsarClient client = pulsar.getClient();
+        PulsarClient spyClient = Mockito.spy(client);
+        Mockito.doReturn(new DummyProducerBuilder<>((PulsarClientImpl) spyClient, Schema.BYTES))
+                .when(spyClient).newProducer(Schema.STRING);
+        // use reflection to replace the client in the broker
+        Field field = PulsarService.class.getDeclaredField("client");
+        field.setAccessible(true);
+        field.set(pulsar, spyClient);
+        try {
+            admin.brokers().healthcheck(TopicVersion.V2);
+            throw new Exception("Should not reach here");
+        } catch (PulsarAdminException e) {
+            log.info("Exception caught", e);
+            assertTrue(e.getMessage().contains("LowOverheadTimeoutException"));
+        }
+        // To ensure we don't have any subscription, the producers and readers are closed.
+        Awaitility.await().untilAsserted(() ->
+                assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
+    }
+
 }