You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/16 13:04:35 UTC
[pulsar] branch master updated: [improve][admin] add topic name and sub name for NotFound error message (#15606)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f89698c11af [improve][admin] add topic name and sub name for NotFound error message (#15606)
f89698c11af is described below
commit f89698c11af3429ee69c3a15a8ef729650b6a705
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon May 16 21:04:27 2022 +0800
[improve][admin] add topic name and sub name for NotFound error message (#15606)
To fix https://issues.apache.org/jira/browse/PULSAR-20
---
.../apache/pulsar/broker/admin/AdminResource.java | 12 +++
.../broker/admin/impl/PersistentTopicsBase.java | 105 +++++++++++++--------
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +
.../broker/admin/AdminApiSubscriptionTest.java | 12 +--
.../apache/pulsar/broker/admin/AdminApiTest.java | 31 +++++-
.../pulsar/broker/admin/PersistentTopicsTest.java | 3 +-
.../org/apache/pulsar/broker/admin/TopicsTest.java | 4 +-
7 files changed, 122 insertions(+), 47 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 7f77568b8bb..43addc30af7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -830,4 +830,16 @@ public abstract class AdminResource extends PulsarWebResource {
&& ((WebApplicationException) realCause).getResponse().getStatus()
== Status.TEMPORARY_REDIRECT.getStatusCode();
}
+
+ protected static String getTopicNotFoundErrorMessage(String topic) {
+ return String.format("Topic %s not found", topic);
+ }
+
+ protected static String getPartitionedTopicNotFoundErrorMessage(String topic) {
+ return String.format("Partitioned Topic %s not found", topic);
+ }
+
+ protected static String getSubNotFoundErrorMessage(String topic, String subscription) {
+ return String.format("Subscription %s not found for topic %s", subscription, topic);
+ }
}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8b046608306..2249ab9c1b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -569,7 +569,7 @@ public class PersistentTopicsBase extends AdminResource {
return pulsar().getNamespaceService().checkTopicExists(topicName)
.thenAccept(exist -> {
if (!exist) {
- throw new RestException(Status.NOT_FOUND, "Topic not exist");
+ throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
}
});
}
@@ -607,7 +607,8 @@ public class PersistentTopicsBase extends AdminResource {
} else if (realCause instanceof MetadataStoreException.NotFoundException) {
log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
asyncResponse.resume(new RestException(
- new RestException(Status.NOT_FOUND, "Partitioned topic does not exist")));
+ new RestException(Status.NOT_FOUND,
+ getPartitionedTopicNotFoundErrorMessage(topicName.toString()))));
} else if (realCause instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) realCause));
} else if (realCause instanceof MetadataStoreException.BadVersionException) {
@@ -1023,7 +1024,7 @@ public class PersistentTopicsBase extends AdminResource {
if (t instanceof TopicBusyException) {
throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
} else if (isManagedLedgerNotFoundException(e)) {
- throw new RestException(Status.NOT_FOUND, "Topic not found");
+ throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
} else {
throw new RestException(t);
}
@@ -1250,7 +1251,8 @@ public class PersistentTopicsBase extends AdminResource {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
} else {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t);
asyncResponse.resume(new RestException(t));
@@ -1322,7 +1324,8 @@ public class PersistentTopicsBase extends AdminResource {
future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
authoritative, false)).thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
return;
}
PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata);
@@ -1396,7 +1399,8 @@ public class PersistentTopicsBase extends AdminResource {
future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
.thenAccept(partitionMetadata -> {
if (partitionMetadata.partitions == 0) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
return;
}
@@ -1484,7 +1488,8 @@ public class PersistentTopicsBase extends AdminResource {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else if (t instanceof PreconditionFailedException) {
asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
@@ -1530,7 +1535,8 @@ public class PersistentTopicsBase extends AdminResource {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
- throw new RestException(Status.NOT_FOUND, "Subscription not found");
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.delete();
}).thenRun(() -> {
@@ -1590,7 +1596,8 @@ public class PersistentTopicsBase extends AdminResource {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return null;
} else {
log.error("[{}] Failed to delete subscription forcefully {} {}",
@@ -1636,7 +1643,8 @@ public class PersistentTopicsBase extends AdminResource {
Topic topic = getTopicReference(topicName);
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
- throw new RestException(Status.NOT_FOUND, "Subscription not found");
+ throw new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName));
}
return sub.deleteForcefully();
}).thenRun(() -> {
@@ -1694,7 +1702,8 @@ public class PersistentTopicsBase extends AdminResource {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(
- new RestException(Status.NOT_FOUND, "Subscription not found"));
+ new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
} else {
log.error("[{}] Failed to skip all messages {} {}",
clientAppId(), topicName, subName, t);
@@ -1742,14 +1751,16 @@ public class PersistentTopicsBase extends AdminResource {
PersistentReplicator repl =
(PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
if (repl == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return CompletableFuture.completedFuture(null);
}
return repl.clearBacklog().whenComplete(biConsumer);
} else {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return CompletableFuture.completedFuture(null);
}
return sub.clearBacklog().whenComplete(biConsumer);
@@ -1785,7 +1796,8 @@ public class PersistentTopicsBase extends AdminResource {
return getTopicReferenceAsync(topicName).thenCompose(t -> {
PersistentTopic topic = (PersistentTopic) t;
if (topic == null) {
- throw new RestException(new RestException(Status.NOT_FOUND, "Topic not found"));
+ throw new RestException(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
}
if (subName.startsWith(topic.getReplicatorPrefix())) {
String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
@@ -1805,7 +1817,8 @@ public class PersistentTopicsBase extends AdminResource {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
return FutureUtil.failedFuture(
- new RestException(Status.NOT_FOUND, "Subscription not found"));
+ new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
}
return sub.skipMessages(numMessages).thenAccept(unused -> {
log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages,
@@ -1906,7 +1919,7 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> {
if (t == null) {
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND,
- "Topic not found"));
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(t instanceof PersistentTopic)) {
@@ -2073,12 +2086,14 @@ public class PersistentTopicsBase extends AdminResource {
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
sub.resetCursor(timestamp).thenRun(() -> {
@@ -2299,12 +2314,14 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(ignore -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName);
if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
@@ -2869,7 +2886,7 @@ public class PersistentTopicsBase extends AdminResource {
messageId.getEntryId());
if (topic == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "Topic not found"));
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
ManagedLedgerImpl managedLedger =
@@ -3267,7 +3284,7 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(__ -> checkTopicExistsAsync(topicName))
.thenCompose(exist -> {
if (!exist) {
- throw new RestException(Status.NOT_FOUND, "Topic not found");
+ throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
} else {
return getPartitionedTopicMetadataAsync(topicName, false, false)
.thenCompose(metadata -> {
@@ -3399,7 +3416,8 @@ public class PersistentTopicsBase extends AdminResource {
if (exception != null) {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
} else {
log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t);
asyncResponse.resume(new RestException(t));
@@ -3475,7 +3493,8 @@ public class PersistentTopicsBase extends AdminResource {
Throwable t = exception.getCause();
if (t instanceof NotFoundException) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "Subscription not found"));
+ getSubNotFoundErrorMessage(topicName.toString(),
+ subName)));
return null;
} else {
log.error("[{}] Failed to expire messages up "
@@ -3518,7 +3537,8 @@ public class PersistentTopicsBase extends AdminResource {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
getTopicReferenceAsync(topicName).thenAccept(t -> {
if (t == null) {
- resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found"));
+ resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(t instanceof PersistentTopic)) {
@@ -3543,7 +3563,8 @@ public class PersistentTopicsBase extends AdminResource {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
resultFuture.completeExceptionally(
- new RestException(Status.NOT_FOUND, "Subscription not found"));
+ new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
issued = sub.expireMessages(expireTimeInSeconds);
@@ -3627,14 +3648,15 @@ public class PersistentTopicsBase extends AdminResource {
return getTopicReferenceAsync(topicName).thenAccept(t -> {
PersistentTopic topic = (PersistentTopic) t;
if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
try {
PersistentSubscription sub = topic.getSubscription(subName);
if (sub == null) {
asyncResponse.resume(new RestException(Status.NOT_FOUND,
- "Subscription not found"));
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
@@ -3954,7 +3976,7 @@ public class PersistentTopicsBase extends AdminResource {
private RestException topicNotFoundReason(TopicName topicName) {
if (!topicName.isPartitioned()) {
- return new RestException(Status.NOT_FOUND, "Topic not found");
+ return new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
}
PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
@@ -3967,12 +3989,13 @@ public class PersistentTopicsBase extends AdminResource {
} else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
- return new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
+ return new RestException(Status.NOT_FOUND, getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
}
private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
if (!topicName.isPartitioned()) {
- return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Topic not found"));
+ return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
}
return getPartitionedTopicMetadataAsync(
@@ -3986,7 +4009,8 @@ public class PersistentTopicsBase extends AdminResource {
} else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
- throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
+ throw new RestException(Status.NOT_FOUND,
+ getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
});
}
@@ -4003,7 +4027,7 @@ public class PersistentTopicsBase extends AdminResource {
return checkNotNull(sub);
} catch (Exception e) {
- throw new RestException(Status.NOT_FOUND, "Subscription not found");
+ throw new RestException(Status.NOT_FOUND, getSubNotFoundErrorMessage(topicName.toString(), subName));
}
}
@@ -4274,7 +4298,8 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
if (!(topic instanceof PersistentTopic)) {
@@ -4787,13 +4812,15 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
@@ -4926,13 +4953,15 @@ public class PersistentTopicsBase extends AdminResource {
.thenCompose(__ -> getTopicReferenceAsync(topicName))
.thenAccept(topic -> {
if (topic == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getTopicNotFoundErrorMessage(topicName.toString())));
return;
}
Subscription sub = topic.getSubscription(subName);
if (sub == null) {
- asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+ asyncResponse.resume(new RestException(Status.NOT_FOUND,
+ getSubNotFoundErrorMessage(topicName.toString(), subName)));
return;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a1a674eac9c..9c3d2227166 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -579,6 +579,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
admin.topics().resetCursor(topicName + "invalid", "my-sub", messageId);
fail("It should have failed due to invalid topic name");
} catch (PulsarAdminException.NotFoundException e) {
+ assertTrue(e.getMessage().contains(topicName));
// Ok
}
@@ -587,6 +588,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
admin.topics().resetCursor(topicName, "invalid-sub", messageId);
fail("It should have failed due to invalid subscription name");
} catch (PulsarAdminException.NotFoundException e) {
+ assertTrue(e.getMessage().contains("invalid-sub"));
// Ok
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 8e4a1d54a3e..5d682718c2a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -65,18 +65,18 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
@Test
public void testExpireMessageWithNonExistTopicAndNonExistSub() {
String uuid = UUID.randomUUID().toString();
- String topic = "test-expire-messages-non-exist-topic-" + uuid;
+ String topic = "persistent://public/default/test-expire-messages-non-exist-topic-" + uuid;
String subscriptionName = "test-expire-messages-non-exist-sub-" + uuid;
PulsarAdminException exception = expectThrows(PulsarAdminException.class,
() -> admin.topics().expireMessages(topic, subscriptionName, 1));
assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
- assertEquals(exception.getMessage(), "Topic not found");
+ assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
exception = expectThrows(PulsarAdminException.class,
() -> admin.topics().expireMessagesForAllSubscriptions(topic, 1));
assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
- assertEquals(exception.getMessage(), "Topic not found");
+ assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
}
@Test
@@ -93,17 +93,17 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
@Test
public void tesSkipMessageWithNonExistTopicAndNotExistSub() {
String uuid = UUID.randomUUID().toString();
- String topic = "test-skip-messages-non-exist-topic-" + uuid;
+ String topic = "persistent://public/default/test-skip-messages-non-exist-topic-" + uuid;
String subscriptionName = "test-skip-messages-non-exist-sub-" + uuid;
PulsarAdminException exception = expectThrows(PulsarAdminException.class,
() -> admin.topics().skipMessages(topic, subscriptionName, 1));
assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
- assertEquals(exception.getMessage(), "Topic not found");
+ assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
exception = expectThrows(PulsarAdminException.class,
() -> admin.topics().skipAllMessages(topic, subscriptionName));
assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
- assertEquals(exception.getMessage(), "Topic not found");
+ assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 0579eb20c48..46ad22b15ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -906,6 +906,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
try {
admin.topics().skipAllMessages(persistentTopicName, subName);
} catch (NotFoundException e) {
+ assertTrue(e.getMessage().contains(subName));
}
admin.topics().delete(persistentTopicName);
@@ -914,6 +915,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
admin.topics().delete(persistentTopicName);
fail("Should have received 404");
} catch (NotFoundException e) {
+ assertTrue(e.getMessage().contains(persistentTopicName));
}
assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
@@ -1114,6 +1116,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
admin.topics().deletePartitionedTopic(anotherTopic);
fail("Should have failed as the partitioned topic was not created");
} catch (NotFoundException nfe) {
+ assertTrue(nfe.getMessage().contains(anotherTopic));
}
admin.topics().deletePartitionedTopic(partitionedTopicName);
@@ -2125,7 +2128,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
admin.topics().getStats("persistent://prop-xyz/ns1/ghostTopic");
fail("The topic doesn't exist");
} catch (NotFoundException e) {
- // OK
+ assertTrue(e.getMessage().contains("persistent://prop-xyz/ns1/ghostTopic"));
}
}
@@ -2215,6 +2218,20 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
topicName = "persistent://prop-xyz/ns1/" + topicName;
+ try {
+ admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+ } catch (PulsarAdminException.NotFoundException e) {
+ assertTrue(e.getMessage().contains(topicName));
+ }
+
+ admin.topics().createNonPartitionedTopic(topicName);
+
+ try {
+ admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+ } catch (PulsarAdminException.NotFoundException e) {
+ assertTrue(e.getMessage().contains("my-sub"));
+ }
+
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").startMessageIdInclusive()
@@ -2407,8 +2424,20 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10));
topicName = "persistent://prop-xyz/ns1/" + topicName;
+ try {
+ admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+ } catch (PulsarAdminException.NotFoundException e) {
+ assertTrue(e.getMessage().contains(topicName));
+ }
+
admin.topics().createPartitionedTopic(topicName, 4);
+ try {
+ admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+ } catch (PulsarAdminException.NotFoundException e) {
+ assertTrue(e.getMessage().contains("my-sub"));
+ }
+
// create consumer and subscription
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
.subscriptionName("my-sub").startMessageIdInclusive()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index eac488b381c..632987b69e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -176,7 +176,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
- Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic not found");
+ Assert.assertEquals(errorCaptor.getValue().getMessage(), String.format("Topic %s not found",
+ "persistent://my-tenant/my-namespace/topic-not-found"));
// 2) Confirm that the partitioned topic does not exist
response = mock(AsyncResponse.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 35e205f2dc9..3327b83b6ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -383,7 +383,9 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, false, producerMessages);
ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
- Assert.assertTrue(responseCaptor.getValue().getMessage().contains("Topic not exist"));
+ System.out.println(responseCaptor.getValue().getMessage());
+ Assert.assertTrue(responseCaptor.getValue().getMessage()
+ .contains(String.format("Topic %s not found", topicName)));
}
@Test