You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/10 06:44:47 UTC
[pulsar] branch branch-2.8 updated: [Branch-2.8] Fix Broker HealthCheck Endpoint Exposes Race Conditions. (#14618)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 32f8065 [Branch-2.8] Fix Broker HealthCheck Endpoint Exposes Race Conditions. (#14618)
32f8065 is described below
commit 32f8065f8db78171b41f8255d76a1b32753161a2
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu Mar 10 14:43:22 2022 +0800
[Branch-2.8] Fix Broker HealthCheck Endpoint Exposes Race Conditions. (#14618)
---
.../apache/pulsar/broker/admin/AdminResource.java | 15 --
.../pulsar/broker/admin/impl/BrokersBase.java | 195 ++++++++++++---------
.../pulsar/broker/web/PulsarWebResource.java | 16 ++
.../broker/admin/AdminApiHealthCheckTest.java | 91 ++++++++++
.../org/apache/pulsar/common/util/FutureUtil.java | 17 +-
5 files changed, 237 insertions(+), 97 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 066592a..9f85359 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -40,10 +40,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
-import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
@@ -780,19 +778,6 @@ public abstract class AdminResource extends PulsarWebResource {
return future;
}
- protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
- Throwable realCause = FutureUtil.unwrapCompletionException(throwable);
- if (realCause instanceof WebApplicationException) {
- asyncResponse.resume(realCause);
- } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
- asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
- } else if (realCause instanceof PulsarAdminException) {
- asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
- } else {
- asyncResponse.resume(new RestException(realCause));
- }
- }
-
protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityStrategyAsync() {
return getNamespacePoliciesAsync(namespaceName).thenApply(policies -> {
SchemaCompatibilityStrategy schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index ad4a6ff..15303b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -24,9 +24,12 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -47,9 +50,10 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
@@ -300,102 +304,133 @@ public class BrokersBase extends PulsarWebResource {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Cluster doesn't exist"),
@ApiResponse(code = 500, message = "Internal server error")})
- public void healthcheck(@Suspended AsyncResponse asyncResponse) throws Exception {
+ public void healthcheck(@Suspended AsyncResponse asyncResponse) {
+ validateSuperUserAccess();
+ internalRunHealthCheck()
+ .thenAccept(__ -> {
+ LOG.info("[{}] Successfully run health check.", clientAppId());
+ asyncResponse.resume("ok");
+ }).exceptionally(ex -> {
+ LOG.error("[{}] Fail to run health check.", clientAppId(), ex);
+ resumeAsyncResponseExceptionally(asyncResponse, ex);
+ return null;
+ });
+ }
+
+ private CompletableFuture<Void> internalRunHealthCheck() {
String topic;
PulsarClient client;
try {
- validateSuperUserAccess();
String heartbeatNamespace = NamespaceService.getHeartbeatNamespace(
pulsar().getAdvertisedAddress(), pulsar().getConfiguration());
topic = String.format("persistent://%s/%s", heartbeatNamespace, HEALTH_CHECK_TOPIC_SUFFIX);
LOG.info("Running healthCheck with topic={}", topic);
client = pulsar().getClient();
- } catch (Exception e) {
- LOG.error("Error getting heathcheck topic info", e);
- throw new PulsarServerException(e);
+ } catch (PulsarServerException e) {
+ LOG.error("[{}] Fail to run health check while get client.", clientAppId());
+ throw new RestException(e);
}
String messageStr = UUID.randomUUID().toString();
+ final String subscriptionName = "healthCheck-" + messageStr;
// create non-partitioned topic manually and close the previous reader if present.
- try {
- pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
- t.getSubscriptions().forEach((__, value) -> {
- try {
- value.deleteForcefully();
- } catch (Exception e) {
- LOG.warn("Failed to delete previous subscription {} for health check", value.getName(), e);
- }
- });
- });
- } catch (Exception e) {
- LOG.warn("Failed to try to delete subscriptions for health check", e);
- }
- CompletableFuture<Producer<String>> producerFuture =
- client.newProducer(Schema.STRING).topic(topic).createAsync();
- CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING)
- .topic(topic).startMessageId(MessageId.latest).createAsync();
-
- CompletableFuture<Void> completePromise = new CompletableFuture<>();
-
- CompletableFuture.allOf(producerFuture, readerFuture).whenComplete(
- (ignore, exception) -> {
- if (exception != null) {
- completePromise.completeExceptionally(exception);
- } else {
- producerFuture.thenCompose((producer) -> producer.sendAsync(messageStr))
- .whenComplete((ignore2, exception2) -> {
- if (exception2 != null) {
- completePromise.completeExceptionally(exception2);
- }
+ return pulsar().getBrokerService().getTopic(topic, true).thenCompose(topicOptional -> {
+ if (!topicOptional.isPresent()) {
+ LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
+ clientAppId(), topic);
+ throw new RestException(Status.NOT_FOUND,
+ String.format("Topic [%s] not found after create.", topic));
+ }
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ client.newProducer(Schema.STRING).topic(topic).createAsync()
+ .thenCompose(producer -> client.newReader(Schema.STRING)
+ .topic(topic)
+ .subscriptionName(subscriptionName)
+ .startMessageId(MessageId.latest)
+ .createAsync()
+ .exceptionally(createException -> {
+ producer.closeAsync().exceptionally(ex -> {
+ LOG.error("[{}] Close producer fail while heath check.", clientAppId());
+ return null;
});
+ throw FutureUtil.wrapToCompletionException(createException);
+ })
+ .thenCompose(reader -> producer.sendAsync(messageStr)
+ .thenCompose(__ -> healthCheckRecursiveReadNext(reader, messageStr))
+ .whenComplete((__, ex) -> {
+ closeAndReCheck(producer, reader, topicOptional.get(), subscriptionName)
+ .whenComplete((unused, innerEx) -> {
+ if (ex != null) {
+ resultFuture.completeExceptionally(ex);
+ } else {
+ resultFuture.complete(null);
+ }
+ });
+ }))
+ ).exceptionally(ex -> {
+ resultFuture.completeExceptionally(ex);
+ return null;
+ });
+ return resultFuture;
+ });
+ }
- healthcheckReadLoop(readerFuture, completePromise, messageStr);
-
- // timeout read loop after 10 seconds
- FutureUtil.addTimeoutHandling(completePromise,
- HEALTHCHECK_READ_TIMEOUT, pulsar().getExecutor(),
- () -> FutureUtil.createTimeoutException("Timed out reading", getClass(),
- "healthcheck(...)"));
+ private CompletableFuture<Void> healthCheckRecursiveReadNext(Reader<String> reader, String content) {
+ return reader.readNextAsync()
+ .thenCompose(msg -> {
+ if (!Objects.equals(content, msg.getValue())) {
+ return healthCheckRecursiveReadNext(reader, content);
}
+ return CompletableFuture.completedFuture(null);
});
-
- completePromise.whenComplete((ignore, exception) -> {
- producerFuture.thenAccept((producer) -> {
- producer.closeAsync().whenComplete((ignore2, exception2) -> {
- if (exception2 != null) {
- LOG.warn("Error closing producer for healthcheck", exception2);
- }
- });
- });
- readerFuture.thenAccept((reader) -> {
- reader.closeAsync().whenComplete((ignore2, exception2) -> {
- if (exception2 != null) {
- LOG.warn("Error closing reader for healthcheck", exception2);
- }
- });
- });
- if (exception != null) {
- asyncResponse.resume(new RestException(exception));
- } else {
- asyncResponse.resume("ok");
- }
- });
}
- private void healthcheckReadLoop(CompletableFuture<Reader<String>> readerFuture,
- CompletableFuture<?> completablePromise,
- String messageStr) {
- readerFuture.thenAccept((reader) -> {
- CompletableFuture<Message<String>> readFuture = reader.readNextAsync()
- .whenComplete((m, exception) -> {
- if (exception != null) {
- completablePromise.completeExceptionally(exception);
- } else if (m.getValue().equals(messageStr)) {
- completablePromise.complete(null);
- } else {
- healthcheckReadLoop(readerFuture, completablePromise, messageStr);
- }
- });
- });
+ /**
+ * Close producer and reader and then to re-check if this operation is success.
+ *
+ * Re-check
+ * - Producer: If close fails we will print error log to notify user.
+ * - Consumer: If close fails we will force delete subscription.
+ *
+ * @param producer Producer
+ * @param reader Reader
+ * @param topic Topic
+ * @param subscriptionName Subscription name
+ */
+ private CompletableFuture<Void> closeAndReCheck(Producer<String> producer, Reader<String> reader,
+ Topic topic, String subscriptionName) {
+ // no matter exception or success, we still need to
+ // close producer/reader
+ CompletableFuture<Void> producerFuture = producer.closeAsync();
+ CompletableFuture<Void> readerFuture = reader.closeAsync();
+ List<CompletableFuture<Void>> futures = new ArrayList<>(2);
+ futures.add(producerFuture);
+ futures.add(readerFuture);
+ return FutureUtil.waitForAll(Collections.unmodifiableList(futures))
+ .exceptionally(closeException -> {
+ if (readerFuture.isCompletedExceptionally()) {
+ LOG.error("[{}] Close reader fail while heath check.", clientAppId());
+ Subscription subscription =
+ topic.getSubscription(subscriptionName);
+ // re-check subscription after reader close
+ if (subscription != null) {
+ LOG.warn("[{}] Force delete subscription {} "
+ + "when it still exists after the"
+ + " reader is closed.",
+ clientAppId(), subscription);
+ subscription.deleteForcefully()
+ .exceptionally(ex -> {
+ LOG.error("[{}] Force delete subscription fail"
+ + " while health check",
+ clientAppId(), ex);
+ return null;
+ });
+ }
+ } else {
+ // producer future fail.
+ LOG.error("[{}] Close producer fail while heath check.", clientAppId());
+ }
+ return null;
+ });
}
private synchronized void deleteDynamicConfigurationOnZk(String configName) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
index 71eae45..8fe6375 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java
@@ -43,6 +43,7 @@ import java.util.concurrent.TimeoutException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
@@ -67,6 +68,8 @@ import org.apache.pulsar.broker.resources.NamespaceResources.IsolationPolicyReso
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.broker.resources.ResourceGroupResources;
import org.apache.pulsar.broker.resources.TenantResources;
+import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.PulsarServiceNameResolver;
import org.apache.pulsar.common.naming.Constants;
@@ -1207,4 +1210,17 @@ public abstract class PulsarWebResource {
}
return tree;
}
+
+ protected static void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable exception) {
+ Throwable realCause = FutureUtil.unwrapCompletionException(exception);
+ if (realCause instanceof WebApplicationException) {
+ asyncResponse.resume(realCause);
+ } else if (realCause instanceof BrokerServiceException.NotAllowedException) {
+ asyncResponse.resume(new RestException(Status.CONFLICT, realCause));
+ } else if (realCause instanceof PulsarAdminException) {
+ asyncResponse.resume(new RestException(((PulsarAdminException) realCause)));
+ } else {
+ asyncResponse.resume(new RestException(realCause));
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
new file mode 100644
index 0000000..c44d771
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiHealthCheckTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin;
+
+import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.compaction.Compactor;
+import org.awaitility.Awaitility;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class AdminApiHealthCheckTest extends MockedPulsarServiceBaseTest {
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception {
+ resetConfig();
+ super.internalSetup();
+ admin.clusters().createCluster("test",
+ ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(
+ Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("pulsar", tenantInfo);
+ admin.namespaces().createNamespace("pulsar/system", Sets.newHashSet("test"));
+ admin.tenants().createTenant("public", tenantInfo);
+ admin.namespaces().createNamespace("public/default", Sets.newHashSet("test"));
+ }
+ @AfterMethod(alwaysRun = true)
+ @Override
+ public void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Test
+ public void testHealthCheckup() throws Exception {
+ final int times = 30;
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ pulsar.getExecutor().execute(() -> {
+ try {
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck();
+ }
+ future.complete(null);
+ } catch (PulsarAdminException e) {
+ future.completeExceptionally(e);
+ }
+ });
+ for (int i = 0; i < times; i++) {
+ admin.brokers().healthcheck();
+ }
+ // To ensure we don't have any subscription
+ final String testHealthCheckTopic = String.format("persistent://pulsar/test/localhost:%s/healthcheck",
+ pulsar.getConfig().getWebServicePort().get());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertFalse(future.isCompletedExceptionally());
+ });
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertTrue(CollectionUtils.isEmpty(admin.topics()
+ .getSubscriptions(testHealthCheckTopic).stream()
+ // All system topics are using compaction, even though is not explicitly set in the policies.
+ .filter(v -> !v.equals(Compactor.COMPACTION_SUBSCRIPTION))
+ .collect(Collectors.toList())
+ ))
+ );
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
index 45fd7f7..7d9fd79 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/FutureUtil.java
@@ -78,8 +78,7 @@ public class FutureUtil {
}
/**
- * If the future is cancelled or times out, the cancel action will be
- * invoked
+ * If the future is cancelled or times out, the cancel action will be invoked.
*
* The action is executed once if the future completes with
* {@link java.util.concurrent.CancellationException} or {@link TimeoutException}
@@ -203,4 +202,18 @@ public class FutureUtil {
}
return Optional.empty();
}
+
+ /**
+ * Wrap throwable exception to CompletionException if that exception is not an instance of CompletionException.
+ *
+ * @param throwable Exception
+ * @return CompletionException
+ */
+ public static CompletionException wrapToCompletionException(Throwable throwable) {
+ if (throwable instanceof CompletionException) {
+ return (CompletionException) throwable;
+ } else {
+ return new CompletionException(throwable);
+ }
+ }
}