You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/04 03:51:43 UTC

[pulsar] branch branch-2.5 updated: Close the previous reader of the healthcheck topic (#7724)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new bf42ede  Close the previous reader of the healthcheck topic (#7724)
bf42ede is described below

commit bf42edea1151d14c18c5a2225c9a82f886d8f603
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Aug 3 19:02:52 2020 +0800

    Close the previous reader of the healthcheck topic (#7724)
    
    
    (cherry picked from commit ca98a89c62179c710b9190b3b1fd8e04e3b597c4)
---
 .../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java    | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index bab92be..daba0c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -45,6 +46,8 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -262,9 +265,11 @@ public class BrokersBase extends AdminResource {
         PulsarClient client = pulsar().getClient();
 
         String messageStr = UUID.randomUUID().toString();
-        // create non-partitioned topic manually
+        // create non-partitioned topic manually and close the previous reader if present.
         try {
-            pulsar().getBrokerService().getTopic(topic, true).get();
+            pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
+                t.getSubscriptions().values().forEach(Subscription::deleteForcefully);
+            });
         } catch (Exception e) {
             asyncResponse.resume(new RestException(e));
             return;