You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/18 14:43:32 UTC
[pulsar] branch master updated: [pulsar-broker]Fix: race condition
while deleting global topic (#4173)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 78e794a [pulsar-broker]Fix: race condition while deleting global topic (#4173)
78e794a is described below
commit 78e794a98068ebb2bc1f5760a3f8bbedb69c1f02
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sat May 18 07:43:28 2019 -0700
[pulsar-broker]Fix: race condition while deleting global topic (#4173)
---
.../pulsar/broker/service/persistent/PersistentTopic.java | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bfbc2da..c8e5c7e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -795,7 +795,6 @@ public class PersistentTopic implements Topic, AddEntryCallback {
closeClientFuture.thenAccept(delete -> {
if (USAGE_COUNT_UPDATER.get(this) == 0) {
isFenced = true;
-
List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (failIfHasSubscriptions) {
@@ -824,9 +823,14 @@ public class PersistentTopic implements Topic, AddEntryCallback {
@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
- isFenced = false;
- log.error("[{}] Error deleting topic", topic, exception);
- deleteFuture.completeExceptionally(new PersistenceException(exception));
+ if (exception.getCause() instanceof KeeperException.NoNodeException) {
+ log.info("[{}] Topic is already deleted {}", topic, exception.getMessage());
+ deleteLedgerComplete(ctx);
+ } else {
+ isFenced = false;
+ log.error("[{}] Error deleting topic", topic, exception);
+ deleteFuture.completeExceptionally(new PersistenceException(exception));
+ }
}
}, null);
}
@@ -984,7 +988,7 @@ public class PersistentTopic implements Topic, AddEntryCallback {
// doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
- configuredClusters);
+ topic, configuredClusters);
return deleteForcefully();
}