You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/03/02 06:40:46 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #14367: [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions

eolivelli commented on a change in pull request #14367:
URL: https://github.com/apache/pulsar/pull/14367#discussion_r817388570



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -343,41 +349,71 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse,
         String messageStr = UUID.randomUUID().toString();
         // create non-partitioned topic manually and close the previous reader if present.
         return pulsar().getBrokerService().getTopic(topicName, true)
-                // check and clean all subscriptions
                 .thenCompose(topicOptional -> {
                     if (!topicOptional.isPresent()) {
                         LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
                                 clientAppId(), topicName);
-                        throw new RestException(Status.NOT_FOUND, "Topic [{}] not found after create.");
+                        throw new RestException(Status.NOT_FOUND,
+                                String.format("Topic [%s] not found after create.", topicName));
                     }
-                    Topic topic = topicOptional.get();
-                    // clean all subscriptions
-                    return FutureUtil.waitForAll(topic.getSubscriptions().values()
-                            .stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
-                            .thenApply(__ -> topic);
-                }).thenCompose(topic -> {
                     try {
                         PulsarClient client = pulsar().getClient();
                         return client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                                        .thenCombine(client.newReader(Schema.STRING).topic(topicName)
+                                .thenCombine(client.newReader(Schema.STRING).topic(topicName)
                                         .startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-                                                        producer.sendAsync(messageStr).thenCompose(__ ->
-                                                                healthCheckRecursiveReadNext(reader, messageStr))
-                                                        .thenCompose(__ -> {
-                                                            List<CompletableFuture<Void>> closeFutures =
-                                                                    new ArrayList<>();
-                                                            closeFutures.add(producer.closeAsync());
-                                                            closeFutures.add(reader.closeAsync());
-                                                            return FutureUtil.waitForAll(closeFutures);
-                                                        })
-                                        ).thenAccept(ignore -> {});
+                                        producer.sendAsync(messageStr).thenCompose(__ ->
+                                                        healthCheckRecursiveReadNext(reader, messageStr))
+                                                .thenCompose(__ -> {

Review comment:
       I have now realised a problem on this code and it was there before.
   In case of exception we are not closing the producer and the reader!
   This is because thenCompose works only on the happy path.
   We have to rework this chain in a way that the producer and the reader are always closed (with a whenComplete for instance)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org