You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/14 11:26:19 UTC
[pulsar] 03/03: [improve][broker] Improve naming for delete topic error (#16965)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1df433af8eeabd0adde56dc0a944dbf2cee87130
Author: Lei Zhiyuan <le...@gmail.com>
AuthorDate: Sun Aug 14 18:17:31 2022 +0800
[improve][broker] Improve naming for delete topic error (#16965)
---
.../org/apache/pulsar/broker/admin/v1/PersistentTopics.java | 2 +-
.../org/apache/pulsar/broker/admin/v2/PersistentTopics.java | 2 +-
.../broker/service/nonpersistent/NonPersistentTopic.java | 3 ++-
.../pulsar/broker/service/persistent/PersistentTopic.java | 10 ++++++++--
.../src/test/java/org/apache/pulsar/schema/SchemaTest.java | 4 ++--
5 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index d8a9cdc7ce4..df32386152a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -381,7 +381,7 @@ public class PersistentTopics extends PersistentTopicsBase {
Throwable t = FutureUtil.unwrapCompletionException(ex);
if (!force && (t instanceof BrokerServiceException.TopicBusyException)) {
ex = new RestException(Response.Status.PRECONDITION_FAILED,
- "Topic has active producers/subscriptions");
+ t.getMessage());
}
if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6be6490fd0b..327a575f314 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -1066,7 +1066,7 @@ public class PersistentTopics extends PersistentTopicsBase {
Throwable t = FutureUtil.unwrapCompletionException(ex);
if (!force && (t instanceof BrokerServiceException.TopicBusyException)) {
ex = new RestException(Response.Status.PRECONDITION_FAILED,
- "Topic has active producers/subscriptions");
+ t.getMessage());
}
if (isManagedLedgerNotFoundException(t)) {
ex = new RestException(Response.Status.NOT_FOUND,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 7473fdaf786..30941e0d719 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -419,7 +419,8 @@ public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPol
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
- deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
+ deleteFuture.completeExceptionally(
+ new TopicBusyException("Topic has subscriptions:" + subscriptions.keys()));
return;
}
} else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 6673bedce23..742be6e1afe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1130,9 +1130,15 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
log.warn("[{}] Topic is already being closed or deleted", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
- return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions"));
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has subscriptions: " + subscriptions.keys()));
} else if (failIfHasBacklogs && hasBacklogs()) {
- return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up"));
+ List<String> backlogSubs =
+ subscriptions.values().stream()
+ .filter(sub -> sub.getNumberOfEntriesInBacklog(false) > 0)
+ .map(PersistentSubscription::getName).toList();
+ return FutureUtil.failedFuture(
+ new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 5da60b800f7..60c612d2515 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -846,7 +846,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
} catch (Exception e) {
assertThat(e.getMessage())
.isNotNull()
- .startsWith("Topic has active producers/subscriptions");
+ .startsWith("Topic has 2 connected producers/consumers");
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topic1).getSchemaName()).get().size(), 2);
@@ -936,7 +936,7 @@ public class SchemaTest extends MockedPulsarServiceBaseTest {
admin.topics().delete(topicOne, false);
fail();
} catch (Exception e) {
- assertTrue(e.getMessage().startsWith("Topic has active producers/subscriptions"));
+ assertTrue(e.getMessage().startsWith("Topic has 2 connected producers/consumers"));
}
assertEquals(this.getPulsar().getSchemaRegistryService()
.trimDeletedSchemaAndGetList(TopicName.get(topicOne).getSchemaName()).get().size(), 2);