You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/10 06:44:47 UTC

[pulsar] branch branch-2.8 updated: [Branch-2.8] Fix Broker HealthCheck Endpoint Exposes Race Conditions. (#14618)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 32f8065  [Branch-2.8] Fix Broker HealthCheck Endpoint Exposes Race Conditions. (#14618)
32f8065 is described below

commit 32f8065f8db78171b41f8255d76a1b32753161a2
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Mar 10 14:43:22 2022 +0800

    [Branch-2.8] Fix Broker HealthCheck Endpoint Exposes Race Conditions. (#14618)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  15 --
 .../pulsar/broker/admin/impl/BrokersBase.java      | 195 ++++++++++++---------
 .../pulsar/broker/web/PulsarWebResource.java       |  16 ++
 .../broker/admin/AdminApiHealthCheckTest.java      |  91 ++++++++++
 .../org/apache/pulsar/common/util/FutureUtil.java  |  17 +-
 5 files changed, 237 insertions(+), 97 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 066592a..9f85359 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -40,10 +40,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.internal.TopicsImpl;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.naming.Constants;
@@ -780,19 +778,6 @@ public abstract class AdminResource extends PulsarWebResource {
         return future;
     }
 
-    protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
-        Throwable realCause = FutureUtil.unwrapCompletionException(throwable);
-        if (realCause instanceof WebApplicationException) {
-            asyncResponse.resume(realCause);
-        } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
-            asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
-        } else if (realCause instanceof PulsarAdminException) {
-            asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
-        } else {
-            asyncResponse.resume(new RestException(realCause));
-        }
-    }
-
     protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
         return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
             SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index ad4a6ff..15303b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,9 +24,12 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -47,9 +50,10 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -300,102 +304,133 @@ public class BrokersBase extends PulsarWebResource {
         @ApiResponse(code = 403, message = "Don't have admin permission"),
         @ApiResponse(code = 404, message = "Cluster doesn't exist"),
         @ApiResponse(code = 500, message = "Internal server error")})
-    public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception {
+    public void healthcheck(@Suspended AsyncResponse asyncResponse) {
+        validateSuperUserAccess();
+        internalRunHealthCheck()
+                .thenAccept(__ -> {
+                    LOG.info("[{}] Successfully run health check.", clientAppId());
+                    asyncResponse.resume("ok");
+                }).exceptionally(ex -> {
+                    LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
+    }
+
+    private CompletableFuture<Void> internalRunHealthCheck() {
         String topic;
         PulsarClient client;
         try {
-            validateSuperUserAccess();
             String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
                     pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
             topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX);
             LOG.info("Running healthCheck with topic={}", topic);
             client = pulsar().getClient();
-        } catch (Exception e) {
-            LOG.error("Error getting heathcheck topic info", e);
-            throw new PulsarServerException(e);
+        } catch (PulsarServerException e) {
+            LOG.error("[{}] Fail to run health check while get client.", clientAppId());
+            throw new RestException(e);
         }
         String messageStr = UUID.randomUUID().toString();
+        final String subscriptionName = "healthCheck-" + messageStr;
         // create non-partitioned topic manually and close the previous reader if present.
-        try {
-            pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
-                t.getSubscriptions().forEach((__, value) -> {
-                    try {
-                        value.deleteForcefully();
-                    } catch (Exception e) {
-                        LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e);
-                    }
-                });
-            });
-        } catch (Exception e) {
-            LOG.warn("Failed to try to delete subscriptions for health check", e);
-        }
-        CompletableFuture<Producer<String>> producerFuture =
-            client.newProducer(Schema.STRING).topic(topic).createAsync();
-        CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING)
-            .topic(topic).startMessageId(MessageId.latest).createAsync();
-
-        CompletableFuture<Void> completePromise = new CompletableFuture<>();
-
-        CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
-                (ignore, exception) -> {
-                    if (exception != null) {
-                        completePromise.completeExceptionally(exception);
-                    } else {
-                        producerFuture.thenCompose((producer) -> producer.sendAsync(messageStr))
-                            .whenComplete((ignore2, exception2) -> {
-                                    if (exception2 != null) {
-                                        completePromise.completeExceptionally(exception2);
-                                    }
+        return pulsar().getBrokerService().getTopic(topic, true).thenCompose(topicOptional -> {
+            if (!topicOptional.isPresent()) {
+                LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
+                        clientAppId(), topic);
+                throw new RestException(Status.NOT_FOUND,
+                        String.format("Topic [%s] not found after create.", topic));
+            }
+            CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+            client.newProducer(Schema.STRING).topic(topic).createAsync()
+                    .thenCompose(producer -> client.newReader(Schema.STRING)
+                            .topic(topic)
+                            .subscriptionName(subscriptionName)
+                            .startMessageId(MessageId.latest)
+                            .createAsync()
+                            .exceptionally(createException -> {
+                                producer.closeAsync().exceptionally(ex -> {
+                                    LOG.error("[{}] Close producer fail while heath check.", clientAppId());
+                                    return null;
                                 });
+                                throw FutureUtil.wrapToCompletionException(createException);
+                            })
+                            .thenCompose(reader -> producer.sendAsync(messageStr)
+                                    .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
+                                    .whenComplete((__, ex) -> {
+                                        closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
+                                                .whenComplete((unused, innerEx) -> {
+                                                    if (ex != null) {
+                                                        resultFuture.completeExceptionally(ex);
+                                                    } else {
+                                                        resultFuture.complete(null);
+                                                    }
+                                                });
+                                    }))
+                    ).exceptionally(ex -> {
+                        resultFuture.completeExceptionally(ex);
+                        return null;
+                    });
+            return resultFuture;
+        });
+    }
 
-                        healthcheckReadLoop(readerFuture, completePromise, messageStr);
-
-                        // timeout read loop after 10 seconds
-                        FutureUtil.addTimeoutHandling(completePromise,
-                                HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(),
-                                () -> FutureUtil.createTimeoutException("Timed out reading", getClass(),
-                                        "healthcheck(...)"));
+    private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
+        return reader.readNextAsync()
+                .thenCompose(msg -> {
+                    if (!Objects.equals(content, msg.getValue())) {
+                        return healthCheckRecursiveReadNext(reader, content);
                     }
+                    return CompletableFuture.completedFuture(null);
                 });
-
-        completePromise.whenComplete((ignore, exception) -> {
-                producerFuture.thenAccept((producer) -> {
-                        producer.closeAsync().whenComplete((ignore2, exception2) -> {
-                                if (exception2 != null) {
-                                    LOG.warn("Error closing producer for healthcheck", exception2);
-                                }
-                            });
-                    });
-                readerFuture.thenAccept((reader) -> {
-                        reader.closeAsync().whenComplete((ignore2, exception2) -> {
-                                if (exception2 != null) {
-                                    LOG.warn("Error closing reader for healthcheck", exception2);
-                                }
-                            });
-                    });
-                if (exception != null) {
-                    asyncResponse.resume(new RestException(exception));
-                } else {
-                    asyncResponse.resume("ok");
-                }
-            });
     }
 
-    private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture,
-                                     CompletableFuture<?> completablePromise,
-                                     String messageStr) {
-        readerFuture.thenAccept((reader) -> {
-                CompletableFuture<Message<String>> readFuture = reader.readNextAsync()
-                    .whenComplete((m, exception) -> {
-                            if (exception != null) {
-                                completablePromise.completeExceptionally(exception);
-                            } else if (m.getValue().equals(messageStr)) {
-                                completablePromise.complete(null);
-                            } else {
-                                healthcheckReadLoop(readerFuture, completablePromise, messageStr);
-                            }
-                        });
-            });
+    /**
+     * Close producer and reader and then to re-check if this operation is success.
+     *
+     * Re-check
+     * - Producer: If close fails we will print error log to notify user.
+     * - Consumer: If close fails we will force delete subscription.
+     *
+     * @param producer Producer
+     * @param reader Reader
+     * @param topic  Topic
+     * @param subscriptionName  Subscription name
+     */
+    private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
+                                                    Topic topic, String subscriptionName) {
+        // no matter exception or success, we still need to
+        // close producer/reader
+        CompletableFuture<Void> producerFuture = producer.closeAsync();
+        CompletableFuture<Void> readerFuture = reader.closeAsync();
+        List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+        futures.add(producerFuture);
+        futures.add(readerFuture);
+        return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+                .exceptionally(closeException -> {
+                    if (readerFuture.isCompletedExceptionally()) {
+                        LOG.error("[{}] Close reader fail while heath check.", clientAppId());
+                        Subscription subscription =
+                                topic.getSubscription(subscriptionName);
+                        // re-check subscription after reader close
+                        if (subscription != null) {
+                            LOG.warn("[{}] Force delete subscription {} "
+                                            + "when it still exists after the"
+                                            + " reader is closed.",
+                                    clientAppId(), subscription);
+                            subscription.deleteForcefully()
+                                    .exceptionally(ex -> {
+                                        LOG.error("[{}] Force delete subscription fail"
+                                                        + " while health check",
+                                                clientAppId(), ex);
+                                        return null;
+                                    });
+                        }
+                    } else {
+                        // producer future fail.
+                        LOG.error("[{}] Close producer fail while heath check.", clientAppId());
+                    }
+                    return null;
+                });
     }
 
     private synchronized void deleteDynamicConfigurationOnZk(String configName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 71eae45..8fe6375 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException;
 import javax.servlet.ServletContext;
 import javax.servlet.http.HttpServletRequest;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
@@ -67,6 +68,8 @@ import org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyReso
 import org.apache.pulsar.broker.resources.PulsarResources;
 import org.apache.pulsar.broker.resources.ResourceGroupResources;
 import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
 import org.apache.pulsar.common.naming.Constants;
@@ -1207,4 +1210,17 @@ public abstract class PulsarWebResource {
         }
         return tree;
     }
+
+    protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
+        Throwable realCause = FutureUtil.unwrapCompletionException(exception);
+        if (realCause instanceof WebApplicationException) {
+            asyncResponse.resume(realCause);
+        } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
+            asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+        } else if (realCause instanceof PulsarAdminException) {
+            asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
+        } else {
+            asyncResponse.resume(new RestException(realCause));
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
new file mode 100644
index 0000000..c44d771
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception {
+        resetConfig();
+        super.internalSetup();
+        admin.clusters().createCluster("test",
+                ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(
+                Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("pulsar", tenantInfo);
+        admin.namespaces().createNamespace("pulsar/system", Sets.newHashSet("test"));
+        admin.tenants().createTenant("public", tenantInfo);
+        admin.namespaces().createNamespace("public/default", Sets.newHashSet("test"));
+    }
+    @AfterMethod(alwaysRun = true)
+    @Override
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testHealthCheckup() throws Exception {
+        final int times = 30;
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        pulsar.getExecutor().execute(() -> {
+            try {
+                for (int i = 0; i < times; i++) {
+                    admin.brokers().healthcheck();
+                }
+                future.complete(null);
+            } catch (PulsarAdminException e) {
+                future.completeExceptionally(e);
+            }
+        });
+        for (int i = 0; i < times; i++) {
+            admin.brokers().healthcheck();
+        }
+        // To ensure we don't have any subscription
+        final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+                pulsar.getConfig().getWebServicePort().get());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertFalse(future.isCompletedExceptionally());
+        });
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+                        .getSubscriptions(testHealthCheckTopic).stream()
+                        // All system topics are using compaction, even though is not explicitly set in the policies.
+                        .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+                        .collect(Collectors.toList())
+                ))
+        );
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 45fd7f7..7d9fd79 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -78,8 +78,7 @@ public class FutureUtil {
     }
 
     /**
-     * If the future is cancelled or times out, the cancel action will be
-     * invoked
+     * If the future is cancelled or times out, the cancel action will be invoked.
      *
      * The action is executed once if the future completes with
      * {@link java.util.concurrent.CancellationException} or {@link TimeoutException}
@@ -203,4 +202,18 @@ public class FutureUtil {
         }
         return Optional.empty();
     }
+
+    /**
+     * Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException.
+     *
+     * @param throwable Exception
+     * @return CompletionException
+     */
+    public static CompletionException wrapToCompletionException(Throwable throwable) {
+        if (throwable instanceof CompletionException) {
+            return (CompletionException) throwable;
+        } else {
+            return new CompletionException(throwable);
+        }
+    }
 }