You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2024/01/30 15:40:16 UTC
(pulsar) branch master updated: [fix] [broker] add timeout for health check read. (#21990)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 925bf2e375b [fix] [broker] add timeout for health check read. (#21990)
925bf2e375b is described below
commit 925bf2e375b5e18bcd4d93fbed16c96cb72f1e52
Author: thetumbled <52...@users.noreply.github.com>
AuthorDate: Tue Jan 30 23:40:09 2024 +0800
[fix] [broker] add timeout for health check read. (#21990)
---
.../pulsar/broker/admin/impl/BrokersBase.java | 13 ++++-
.../broker/admin/AdminApiHealthCheckTest.java | 63 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index f056b18f3f1..61b354610ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -26,6 +26,7 @@ import io.swagger.annotations.ApiResponses;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -34,6 +35,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -80,6 +82,12 @@ public class BrokersBase extends AdminResource {
// log a full thread dump when a deadlock is detected in healthcheck once every 10 minutes
// to prevent excessive logging
private static final long LOG_THREADDUMP_INTERVAL_WHEN_DEADLOCK_DETECTED = 600000L;
+ // there is a timeout of 60 seconds default in the client(readTimeoutMs), so we need to set the timeout
+ // a bit shorter than 60 seconds to avoid the client timeout exception thrown before the server timeout exception.
+ // or we can't propagate the server timeout exception to the client.
+ private static final Duration HEALTH_CHECK_READ_TIMEOUT = Duration.ofSeconds(58);
+ private static final TimeoutException HEALTH_CHECK_TIMEOUT_EXCEPTION =
+ FutureUtil.createTimeoutException("Timeout", BrokersBase.class, "healthCheckRecursiveReadNext(...)");
private volatile long threadDumpLoggedTimestamp;
@GET
@@ -434,7 +442,10 @@ public class BrokersBase extends AdminResource {
});
throw FutureUtil.wrapToCompletionException(createException);
}).thenCompose(reader -> producer.sendAsync(messageStr)
- .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
+ .thenCompose(__ -> FutureUtil.addTimeoutHandling(
+ healthCheckRecursiveReadNext(reader, messageStr),
+ HEALTH_CHECK_READ_TIMEOUT, pulsar().getBrokerService().executor(),
+ () -> HEALTH_CHECK_TIMEOUT_EXCEPTION))
.whenComplete((__, ex) -> {
closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
.whenComplete((unused, innerEx) -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index a780f889de8..357422b11f6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -23,6 +23,7 @@ import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadMXBean;
+import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -31,13 +32,21 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.impl.ProducerBuilderImpl;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.compaction.Compactor;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.springframework.util.CollectionUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -236,4 +245,58 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
))
);
}
+
+ class DummyProducerBuilder<T> extends ProducerBuilderImpl<T> {
+ // This is a dummy producer builder to test the health check timeout
+ // the producer constructed by this builder will not send any message
+ public DummyProducerBuilder(PulsarClientImpl client, Schema schema) {
+ super(client, schema);
+ }
+
+ @Override
+ public CompletableFuture<Producer<T>> createAsync() {
+ CompletableFuture<Producer<T>> future = new CompletableFuture<>();
+ super.createAsync().thenAccept(producer -> {
+ Producer<T> spyProducer = Mockito.spy(producer);
+ Mockito.doReturn(CompletableFuture.completedFuture(MessageId.earliest))
+ .when(spyProducer).sendAsync(Mockito.any());
+ future.complete(spyProducer);
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+ return future;
+ }
+ }
+
+ @Test
+ public void testHealthCheckTimeOut() throws Exception {
+ final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ PulsarClient client = pulsar.getClient();
+ PulsarClient spyClient = Mockito.spy(client);
+ Mockito.doReturn(new DummyProducerBuilder<>((PulsarClientImpl) spyClient, Schema.BYTES))
+ .when(spyClient).newProducer(Schema.STRING);
+ // use reflection to replace the client in the broker
+ Field field = PulsarService.class.getDeclaredField("client");
+ field.setAccessible(true);
+ field.set(pulsar, spyClient);
+ try {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ throw new Exception("Should not reach here");
+ } catch (PulsarAdminException e) {
+ log.info("Exception caught", e);
+ assertTrue(e.getMessage().contains("LowOverheadTimeoutException"));
+ }
+ // To ensure we don't have any subscription, the producers and readers are closed.
+ Awaitility.await().untilAsserted(() ->
+ assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though is not explicitly set in the policies.
+ .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
+ }
+
}