You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/02/26 02:07:38 UTC

[GitHub] [pulsar] mattisonchao commented on a change in pull request #14367: [Broker] Fix Broker HealthCheck Endpoint Exposes Race Conditions

mattisonchao commented on a change in pull request #14367:
URL: https://github.com/apache/pulsar/pull/14367#discussion_r815252492



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
##########
@@ -343,41 +350,81 @@ public void healthCheck(@Suspended AsyncResponse asyncResponse,
         String messageStr = UUID.randomUUID().toString();
         // create non-partitioned topic manually and close the previous reader if present.
         return pulsar().getBrokerService().getTopic(topicName, true)
-                // check and clean all subscriptions
                 .thenCompose(topicOptional -> {
                     if (!topicOptional.isPresent()) {
                         LOG.error("[{}] Fail to run health check while get topic {}. because get null value.",
                                 clientAppId(), topicName);
                         throw new RestException(Status.NOT_FOUND, "Topic [{}] not found after create.");
                     }
-                    Topic topic = topicOptional.get();
-                    // clean all subscriptions
-                    return FutureUtil.waitForAll(topic.getSubscriptions().values()
-                            .stream().map(Subscription::deleteForcefully).collect(Collectors.toList()))
-                            .thenApply(__ -> topic);
-                }).thenCompose(topic -> {
+                    return tryCleanPreviousSubscriptions(topicOptional.get());
+                }).thenCompose(unused-> {
                     try {
                         PulsarClient client = pulsar().getClient();
                         return client.newProducer(Schema.STRING).topic(topicName).createAsync()
-                                        .thenCombine(client.newReader(Schema.STRING).topic(topicName)
+                                .thenCombine(client.newReader(Schema.STRING).topic(topicName)
                                         .startMessageId(MessageId.latest).createAsync(), (producer, reader) ->
-                                                        producer.sendAsync(messageStr).thenCompose(__ ->
-                                                                healthCheckRecursiveReadNext(reader, messageStr))
-                                                        .thenCompose(__ -> {
-                                                            List<CompletableFuture<Void>> closeFutures =
-                                                                    new ArrayList<>();
-                                                            closeFutures.add(producer.closeAsync());
-                                                            closeFutures.add(reader.closeAsync());
-                                                            return FutureUtil.waitForAll(closeFutures);
-                                                        })
-                                        ).thenAccept(ignore -> {});
+                                        producer.sendAsync(messageStr).thenCompose(__ ->
+                                                        healthCheckRecursiveReadNext(reader, messageStr))
+                                                .thenCompose(__ -> {
+                                                    List<CompletableFuture<Void>> closeFutures =
+                                                            new ArrayList<>();
+                                                    closeFutures.add(producer.closeAsync());
+                                                    closeFutures.add(reader.closeAsync());
+                                                    return FutureUtil.waitForAll(closeFutures);
+                                                })
+                                ).thenAccept(ignore -> {});
                     } catch (PulsarServerException e) {
                         LOG.error("[{}] Fail to run health check while get client.", clientAppId());
                         throw new RestException(e);
                     }
                 });
     }
 
+    private CompletableFuture<Void> tryCleanPreviousSubscriptions(Topic topic) {
+        // clean all subscriptions
+        return FutureUtil.waitForAll(topic.getSubscriptions().values()
+                .stream().filter(subscription ->
+                // All system topics are using compaction, even though is not explicitly set in the policies.
+                        !subscription.getName().equals(Compactor.COMPACTION_SUBSCRIPTION))
+                .map(Subscription::delete).collect(Collectors.toList()))

Review comment:
       @Technoboy- 
   Because all the system topic has compaction subscription that name is ``__compaction``, every time we call health check, we will create a new reader with a subscription name's ``reader-xxx``. we just need to remove the reader subscriptions and don't need to remove ``__compaction`` subscription.
   I'm not sure if I explain clearly, please let me know what you think, very thanks~
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org