You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 02:40:57 UTC
[pulsar] 04/14: [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b0938399d5d8d7f752a93916fc804b59af1c6b32
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Fri Mar 4 10:29:34 2022 +0800
[Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367)
(cherry picked from commit 4f1e39b6921ea401b8c27f17a041d06d85f8abf8)
---
.../pulsar/broker/admin/impl/BrokersBase.java | 129 +++++++++++++++------
.../broker/admin/AdminApiHealthCheckTest.java | 97 +++++++++++++++-
.../org/apache/pulsar/common/util/FutureUtil.java | 14 +++
3 files changed, 203 insertions(+), 37 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 4530d8c..eda186a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -30,7 +30,6 @@ import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
@@ -54,6 +53,7 @@ import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
@@ -338,43 +338,104 @@ public class BrokersBase extends AdminResource {
NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
: NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
- String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
+ final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
- String messageStr = UUID.randomUUID().toString();
+ final String messageStr = UUID.randomUUID().toString();
+ final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
return pulsar().getBrokerService().getTopic(topicName, true)
- // check and clean all subscriptions
- .thenCompose(topicOptional -> {
- if (!topicOptional.isPresent()) {
- LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
- clientAppId(), topicName);
- throw new RestException(Status.NOT_FOUND, "Topic [{}] not found after create.");
- }
- Topic topic = topicOptional.get();
- // clean all subscriptions
- return FutureUtil.waitForAll(topic.getSubscriptions().values()
- .stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
- .thenApply(__ -> topic);
- }).thenCompose(topic -> {
- try {
- PulsarClient client = pulsar().getClient();
- return client.newProducer(Schema.STRING).topic(topicName).createAsync()
- .thenCombine(client.newReader(Schema.STRING).topic(topicName)
- .startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
- producer.sendAsync(messageStr).thenCompose(__ ->
- healthCheckRecursiveReadNext(reader, messageStr))
- .thenCompose(__ -> {
- List<CompletableFuture<Void>> closeFutures =
- new ArrayList<>();
- closeFutures.add(producer.closeAsync());
- closeFutures.add(reader.closeAsync());
- return FutureUtil.waitForAll(closeFutures);
- })
- ).thenAccept(ignore -> {});
- } catch (PulsarServerException e) {
- LOG.error("[{}] Fail to run health check while get client.", clientAppId());
- throw new RestException(e);
+ .thenCompose(topicOptional -> {
+ if (!topicOptional.isPresent()) {
+ LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
+ clientAppId(), topicName);
+ throw new RestException(Status.NOT_FOUND,
+ String.format("Topic [%s] not found after create.", topicName));
+ }
+ PulsarClient client;
+ try {
+ client = pulsar().getClient();
+ } catch (PulsarServerException e) {
+ LOG.error("[{}] Fail to run health check while get client.", clientAppId());
+ throw new RestException(e);
+ }
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ client.newProducer(Schema.STRING).topic(topicName).createAsync()
+ .thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName)
+ .subscriptionName(subscriptionName)
+ .startMessageId(MessageId.latest)
+ .createAsync().exceptionally(createException -> {
+ producer.closeAsync().exceptionally(ex -> {
+ LOG.error("[{}] Close producer fail while heath check.", clientAppId());
+ return null;
+ });
+ throw FutureUtil.wrapToCompletionException(createException);
+ }).thenCompose(reader -> producer.sendAsync(messageStr)
+ .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
+ .whenComplete((__, ex) -> {
+ closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
+ .whenComplete((unused, innerEx) -> {
+ if (ex != null) {
+ resultFuture.completeExceptionally(ex);
+ } else {
+ resultFuture.complete(null);
+ }
+ });
+ }
+ ))
+ ).exceptionally(ex -> {
+ resultFuture.completeExceptionally(ex);
+ return null;
+ });
+ return resultFuture;
+ });
+ }
+
+ /**
+ * Close producer and reader and then to re-check if this operation is success.
+ *
+ * Re-check
+ * - Producer: If close fails we will print error log to notify user.
+ * - Consumer: If close fails we will force delete subscription.
+ *
+ * @param producer Producer
+ * @param reader Reader
+ * @param topic Topic
+ * @param subscriptionName Subscription name
+ */
+ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
+ Topic topic, String subscriptionName) {
+ // no matter exception or success, we still need to
+ // close producer/reader
+ CompletableFuture<Void> producerFuture = producer.closeAsync();
+ CompletableFuture<Void> readerFuture = reader.closeAsync();
+ List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+ futures.add(producerFuture);
+ futures.add(readerFuture);
+ return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+ .exceptionally(closeException -> {
+ if (readerFuture.isCompletedExceptionally()) {
+ LOG.error("[{}] Close reader fail while heath check.", clientAppId());
+ Subscription subscription =
+ topic.getSubscription(subscriptionName);
+ // re-check subscription after reader close
+ if (subscription != null) {
+ LOG.warn("[{}] Force delete subscription {} "
+ + "when it still exists after the"
+ + " reader is closed.",
+ clientAppId(), subscription);
+ subscription.deleteForcefully()
+ .exceptionally(ex -> {
+ LOG.error("[{}] Force delete subscription fail"
+ + " while health check",
+ clientAppId(), ex);
+ return null;
+ });
+ }
+ } else {
+ // producer future fail.
+ LOG.error("[{}] Close producer fail while heath check.", clientAppId());
}
+ return null;
});
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index 4f01cb1..b9886b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -21,12 +21,19 @@ package org.apache.pulsar.broker.admin;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.TopicVersion;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.springframework.util.CollectionUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
@Test(groups = "broker-admin")
@Slf4j
@@ -55,16 +62,100 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
@Test
public void testHealthCheckup() throws Exception {
- admin.brokers().healthcheck();
+ final int times = 30;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar.getExecutor().execute(() -> {
+ try {
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck();
+ }
+ future.complete(null);
+ }catch (PulsarAdminException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck();
+ }
+ // To ensure we don't have any subscription
+ final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(future.isCompletedExceptionally());
+ });
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though is not explicitly set in the policies.
+ .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
}
@Test
public void testHealthCheckupV1() throws Exception {
- admin.brokers().healthcheck(TopicVersion.V1);
+ final int times = 30;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar.getExecutor().execute(() -> {
+ try {
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V1);
+ }
+ future.complete(null);
+ }catch (PulsarAdminException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V1);
+ }
+ final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(future.isCompletedExceptionally());
+ });
+ // To ensure we don't have any subscription
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though is not explicitly set in the policies.
+ .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
}
@Test
public void testHealthCheckupV2() throws Exception {
- admin.brokers().healthcheck(TopicVersion.V2);
+ final int times = 30;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar.getExecutor().execute(() -> {
+ try {
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ }
+ future.complete(null);
+ }catch (PulsarAdminException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck(TopicVersion.V2);
+ }
+ final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(future.isCompletedExceptionally());
+ });
+ // To ensure we don't have any subscription
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though is not explicitly set in the policies.
+ .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 687cbd2..a29ac8c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -188,4 +188,18 @@ public class FutureUtil {
}
return Optional.empty();
}
+
+ /**
+ * Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException.
+ *
+ * @param throwable Exception
+ * @return CompletionException
+ */
+ public static CompletionException wrapToCompletionException(Throwable throwable) {
+ if (throwable instanceof CompletionException) {
+ return (CompletionException) throwable;
+ } else {
+ return new CompletionException(throwable);
+ }
+ }
}