You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/08/04 03:51:43 UTC
[pulsar] branch branch-2.5 updated: Close the previous reader of
the healthcheck topic (#7724)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new bf42ede Close the previous reader of the healthcheck topic (#7724)
bf42ede is described below
commit bf42edea1151d14c18c5a2225c9a82f886d8f603
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon Aug 3 19:02:52 2020 +0800
Close the previous reader of the healthcheck topic (#7724)
(cherry picked from commit ca98a89c62179c710b9190b3b1fd8e04e3b597c4)
---
.../java/org/apache/pulsar/broker/admin/impl/BrokersBase.java | 9 +++++++--
1 file changed, 7 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index bab92be..daba0c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -22,6 +22,7 @@ import static org.apache.pulsar.broker.service.BrokerService.BROKER_SERVICE_CONF
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -45,6 +46,8 @@ import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.service.BrokerService;
+import org.apache.pulsar.broker.service.Subscription;
+import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -262,9 +265,11 @@ public class BrokersBase extends AdminResource {
PulsarClient client = pulsar().getClient();
String messageStr = UUID.randomUUID().toString();
- // create non-partitioned topic manually
+ // create non-partitioned topic manually and close the previous reader if present.
try {
- pulsar().getBrokerService().getTopic(topic, true).get();
+ pulsar().getBrokerService().getTopic(topic, true).get().ifPresent(t -> {
+ t.getSubscriptions().values().forEach(Subscription::deleteForcefully);
+ });
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;