You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 14:43:32 UTC

[pulsar] branch master updated: [pulsar-broker]Fix: race condition while deleting global topic (#4173)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 78e794a  [pulsar-broker]Fix: race condition while deleting global topic (#4173)
78e794a is described below

commit 78e794a98068ebb2bc1f5760a3f8bbedb69c1f02
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 07:43:28 2019 -0700

    [pulsar-broker]Fix: race condition while deleting global topic (#4173)
---
 .../pulsar/broker/service/persistent/PersistentTopic.java  | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bfbc2da..c8e5c7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -795,7 +795,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
             closeClientFuture.thenAccept(delete -> {
                 if (USAGE_COUNT_UPDATER.get(this) == 0) {
                     isFenced = true;
-
                     List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
                     if (failIfHasSubscriptions) {
@@ -824,9 +823,14 @@ public class PersistentTopic implements Topic, AddEntryCallback {
 
                                 @Override
                                 public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
-                                    isFenced = false;
-                                    log.error("[{}] Error deleting topic", topic, exception);
-                                    deleteFuture.completeExceptionally(new PersistenceException(exception));
+                                    if (exception.getCause() instanceof KeeperException.NoNodeException) {
+                                        log.info("[{}] Topic is already deleted {}", topic, exception.getMessage());
+                                        deleteLedgerComplete(ctx);
+                                    } else {
+                                        isFenced = false;
+                                        log.error("[{}] Error deleting topic", topic, exception);
+                                        deleteFuture.completeExceptionally(new PersistenceException(exception));
+                                    }
                                 }
                             }, null);
                         }
@@ -984,7 +988,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
         // doesn't serve global topic without local repl-cluster configured.
         if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
             log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
-                    configuredClusters);
+                    topic, configuredClusters);
             return deleteForcefully();
         }