You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 02:40:57 UTC

[pulsar] 04/14: [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0938399d5d8d7f752a93916fc804b59af1c6b32
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Fri Mar 4 10:29:34 2022 +0800

    [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions (#14367)
    
    (cherry picked from commit 4f1e39b6921ea401b8c27f17a041d06d85f8abf8)
---
 .../pulsar/broker/admin/impl/BrokersBase.java      | 129 +++++++++++++++------
 .../broker/admin/AdminApiHealthCheckTest.java      |  97 +++++++++++++++-
 .../org/apache/pulsar/common/util/FutureUtil.java  |  14 +++
 3 files changed, 203 insertions(+), 37 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 4530d8c..eda186a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -30,7 +30,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
@@ -54,6 +53,7 @@ import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
@@ -338,43 +338,104 @@ public class BrokersBase extends AdminResource {
         NamespaceName namespaceName = (topicVersion == TopicVersion.V2)
                 ? NamespaceService.getHeartbeatNamespaceV2(pulsar().getAdvertisedAddress(), pulsar().getConfiguration())
                 : NamespaceService.getHeartbeatNamespace(pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
-        String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
+        final String topicName = String.format("persistent://%s/%s", namespaceName, HEALTH_CHECK_TOPIC_SUFFIX);
         LOG.info("[{}] Running healthCheck with topic={}", clientAppId(), topicName);
-        String messageStr = UUID.randomUUID().toString();
+        final String messageStr = UUID.randomUUID().toString();
+        final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader if present.
         return pulsar().getBrokerService().getTopic(topicName, true)
-                // check and clean all subscriptions
-                .thenCompose(topicOptional -> {
-                    if (!topicOptional.isPresent()) {
-                        LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
-                                clientAppId(), topicName);
-                        throw new RestException(Status.NOT_FOUND, "Topic [{}] not found after create.");
-                    }
-                    Topic topic = topicOptional.get();
-                    // clean all subscriptions
-                    return FutureUtil.waitForAll(topic.getSubscriptions().values()
-                            .stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
-                            .thenApply(__ -> topic);
-                }).thenCompose(topic -> {
-                    try {
-                        PulsarClient client = pulsar().getClient();
-                        return client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                                        .thenCombine(client.newReader(Schema.STRING).topic(topicName)
-                                        .startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-                                                        producer.sendAsync(messageStr).thenCompose(__ ->
-                                                                healthCheckRecursiveReadNext(reader, messageStr))
-                                                        .thenCompose(__ -> {
-                                                            List<CompletableFuture<Void>> closeFutures =
-                                                                    new ArrayList<>();
-                                                            closeFutures.add(producer.closeAsync());
-                                                            closeFutures.add(reader.closeAsync());
-                                                            return FutureUtil.waitForAll(closeFutures);
-                                                        })
-                                        ).thenAccept(ignore -> {});
-                    } catch (PulsarServerException e) {
-                        LOG.error("[{}] Fail to run health check while get client.", clientAppId());
-                        throw new RestException(e);
+            .thenCompose(topicOptional -> {
+                if (!topicOptional.isPresent()) {
+                    LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
+                            clientAppId(), topicName);
+                    throw new RestException(Status.NOT_FOUND,
+                            String.format("Topic [%s] not found after create.", topicName));
+                }
+                PulsarClient client;
+                try {
+                    client = pulsar().getClient();
+                } catch (PulsarServerException e) {
+                    LOG.error("[{}] Fail to run health check while get client.", clientAppId());
+                    throw new RestException(e);
+                }
+                CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+                client.newProducer(Schema.STRING).topic(topicName).createAsync()
+                        .thenCompose(producer -> client.newReader(Schema.STRING).topic(topicName)
+                                .subscriptionName(subscriptionName)
+                                .startMessageId(MessageId.latest)
+                                .createAsync().exceptionally(createException -> {
+                                    producer.closeAsync().exceptionally(ex -> {
+                                        LOG.error("[{}] Close producer fail while heath check.", clientAppId());
+                                        return null;
+                                    });
+                                    throw FutureUtil.wrapToCompletionException(createException);
+                                }).thenCompose(reader -> producer.sendAsync(messageStr)
+                                        .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
+                                        .whenComplete((__, ex) -> {
+                                            closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
+                                                    .whenComplete((unused, innerEx) -> {
+                                                        if (ex != null) {
+                                                            resultFuture.completeExceptionally(ex);
+                                                        } else {
+                                                            resultFuture.complete(null);
+                                                        }
+                                                    });
+                                        }
+                                ))
+                        ).exceptionally(ex -> {
+                            resultFuture.completeExceptionally(ex);
+                            return null;
+                        });
+                return resultFuture;
+            });
+    }
+
+    /**
+     * Close producer and reader and then to re-check if this operation is success.
+     *
+     * Re-check
+     * - Producer: If close fails we will print error log to notify user.
+     * - Consumer: If close fails we will force delete subscription.
+     *
+     * @param producer Producer
+     * @param reader Reader
+     * @param topic  Topic
+     * @param subscriptionName  Subscription name
+     */
+    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
+                                                    Topic topic, String subscriptionName) {
+        // no matter exception or success, we still need to
+        // close producer/reader
+        CompletableFuture<Void> producerFuture = producer.closeAsync();
+        CompletableFuture<Void> readerFuture = reader.closeAsync();
+        List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+        futures.add(producerFuture);
+        futures.add(readerFuture);
+        return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+                .exceptionally(closeException -> {
+                    if (readerFuture.isCompletedExceptionally()) {
+                        LOG.error("[{}] Close reader fail while heath check.", clientAppId());
+                        Subscription subscription =
+                                topic.getSubscription(subscriptionName);
+                        // re-check subscription after reader close
+                        if (subscription != null) {
+                            LOG.warn("[{}] Force delete subscription {} "
+                                            + "when it still exists after the"
+                                            + " reader is closed.",
+                                    clientAppId(), subscription);
+                            subscription.deleteForcefully()
+                                    .exceptionally(ex -> {
+                                        LOG.error("[{}] Force delete subscription fail"
+                                                        + " while health check",
+                                                clientAppId(), ex);
+                                        return null;
+                                    });
+                        }
+                    } else {
+                        // producer future fail.
+                        LOG.error("[{}] Close producer fail while heath check.", clientAppId());
                     }
+                    return null;
                 });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
index 4f01cb1..b9886b2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -21,12 +21,19 @@ package org.apache.pulsar.broker.admin;
 import com.google.common.collect.Sets;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.TopicVersion;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
+import org.junit.Assert;
+import org.springframework.util.CollectionUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 @Test(groups = "broker-admin")
 @Slf4j
@@ -55,16 +62,100 @@ public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testHealthCheckup() throws Exception {
-        admin.brokers().healthcheck();
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck();
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck();
+        }
+        // To ensure we don't have any subscription
+        final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 
     @Test
     public void testHealthCheckupV1() throws Exception {
-        admin.brokers().healthcheck(TopicVersion.V1);
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck(TopicVersion.V1);
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck(TopicVersion.V1);
+        }
+        final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        // To ensure we don't have any subscription
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 
     @Test
     public void testHealthCheckupV2() throws Exception {
-        admin.brokers().healthcheck(TopicVersion.V2);
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck(TopicVersion.V2);
+                }
+                future.complete(null);
+            }catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck(TopicVersion.V2);
+        }
+        final String testHealthCheckTopic = String.format("persistent://pulsar/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        // To ensure we don't have any subscription
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
     }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 687cbd2..a29ac8c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -188,4 +188,18 @@ public class FutureUtil {
         }
         return Optional.empty();
     }
+
+    /**
+     * Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException.
+     *
+     * @param throwable Exception
+     * @return CompletionException
+     */
+    public static CompletionException wrapToCompletionException(Throwable throwable) {
+        if (throwable instanceof CompletionException) {
+            return (CompletionException) throwable;
+        } else {
+            return new CompletionException(throwable);
+        }
+    }
 }