You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/26 16:20:45 UTC
[pulsar] branch master updated: Fixed logic for forceful topic
deletion (#7356)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 53bc73c Fixed logic for forceful topic deletion (#7356)
53bc73c is described below
commit 53bc73c88868b8073510a4339f9e4d9ebcc1075a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 09:20:23 2020 -0700
Fixed logic for forceful topic deletion (#7356)
---
.../broker/service/persistent/PersistentTopic.java | 46 ++++++++++------------
1 file changed, 20 insertions(+), 26 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 581f2df..9444bae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -114,6 +114,7 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
+import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
@@ -605,6 +606,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), USAGE_COUNT_UPDATER.get(PersistentTopic.this));
}
+ USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
@@ -838,10 +840,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
try {
if (isFenced) {
log.warn("[{}] Topic is already being closed or deleted", topic);
- deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
- return deleteFuture;
+ return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
+ } else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
+ return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions"));
+ } else if (failIfHasBacklogs && hasBacklogs()) {
+ return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up"));
}
+ isFenced = true; // Avoid clients reconnections while deleting
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
@@ -861,30 +867,16 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
closeClientFuture.thenAccept(delete -> {
- if (USAGE_COUNT_UPDATER.get(this) == 0) {
- isFenced = true;
- List<CompletableFuture<Void>> futures = Lists.newArrayList();
-
- if (failIfHasSubscriptions) {
- if (!subscriptions.isEmpty()) {
- isFenced = false;
- deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
- return;
- }
- } else if (failIfHasBacklogs) {
- if (hasBacklogs()) {
- isFenced = false;
- deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions did not catch up"));
- return;
- }
- } else {
- subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
- }
- if (deleteSchema) {
- futures.add(deleteSchema().thenApply(schemaVersion -> null));
- }
-
- FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
+ // We can proceed with the deletion if either:
+ // 1. No one is connected
+ // 2. We want to kick out everyone and forcefully delete the topic.
+ // In this case, we shouldn't care if the usageCount is 0 or not, just proceed
+ if (USAGE_COUNT_UPDATER.get(this) == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
+ CompletableFuture<SchemaVersion> deleteSchemaFuture = deleteSchema ?
+ deleteSchema()
+ : CompletableFuture.completedFuture(null);
+
+ deleteSchemaFuture.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
isFenced = false;
@@ -918,10 +910,12 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
}
});
} else {
+ isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
}
}).exceptionally(ex->{
+ isFenced = false;
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic."));
return null;