You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/26 16:20:45 UTC

[pulsar] branch master updated: Fixed logic for forceful topic deletion (#7356)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 53bc73c  Fixed logic for forceful topic deletion (#7356)
53bc73c is described below

commit 53bc73c88868b8073510a4339f9e4d9ebcc1075a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 09:20:23 2020 -0700

    Fixed logic for forceful topic deletion (#7356)
---
 .../broker/service/persistent/PersistentTopic.java | 46 ++++++++++------------
 1 file changed, 20 insertions(+), 26 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 581f2df..9444bae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -114,6 +114,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
 import org.apache.pulsar.common.policies.data.TopicStats;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.DateFormatter;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -605,6 +606,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
                                 consumer.consumerName(), USAGE_COUNT_UPDATER.get(PersistentTopic.this));
                     }
+                    USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
                     future.completeExceptionally(
                             new BrokerServiceException("Connection was closed while the opening the cursor "));
                 } else {
@@ -838,10 +840,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
         try {
             if (isFenced) {
                 log.warn("[{}] Topic is already being closed or deleted", topic);
-                deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
-                return deleteFuture;
+                return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
+            } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
+                return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions"));
+            } else if (failIfHasBacklogs && hasBacklogs()) {
+                return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up"));
             }
 
+            isFenced = true; // Avoid clients reconnections while deleting
             CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
             if (closeIfClientsConnected) {
                 List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -861,30 +867,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
             }
 
             closeClientFuture.thenAccept(delete -> {
-                if (USAGE_COUNT_UPDATER.get(this) == 0) {
-                    isFenced = true;
-                    List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
-                    if (failIfHasSubscriptions) {
-                        if (!subscriptions.isEmpty()) {
-                            isFenced = false;
-                            deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
-                            return;
-                        }
-                    } else if (failIfHasBacklogs) {
-                        if (hasBacklogs()) {
-                            isFenced = false;
-                            deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions did not catch up"));
-                            return;
-                        }
-                    } else {
-                        subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
-                    }
-                    if (deleteSchema) {
-                        futures.add(deleteSchema().thenApply(schemaVersion -> null));
-                    }
-
-                    FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+                // We can proceed with the deletion if either:
+                //  1. No one is connected
+                //  2. We want to kick out everyone and forcefully delete the topic.
+                //     In this case, we shouldn't care if the usageCount is 0 or not, just proceed
+                if (USAGE_COUNT_UPDATER.get(this) == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
+                    CompletableFuture<SchemaVersion> deleteSchemaFuture = deleteSchema ?
+                            deleteSchema()
+                            : CompletableFuture.completedFuture(null);
+
+                    deleteSchemaFuture.whenComplete((v, ex) -> {
                         if (ex != null) {
                             log.error("[{}] Error deleting topic", topic, ex);
                             isFenced = false;
@@ -918,10 +910,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                         }
                     });
                 } else {
+                    isFenced = false;
                     deleteFuture.completeExceptionally(new TopicBusyException(
                             "Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
                 }
             }).exceptionally(ex->{
+                isFenced = false;
                 deleteFuture.completeExceptionally(
                         new TopicBusyException("Failed to close clients before deleting topic."));
                 return null;