You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/16 13:04:35 UTC

[pulsar] branch master updated: [improve][admin] add topic name and sub name for NotFound error message (#15606)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f89698c11af [improve][admin] add topic name and sub name for NotFound error message (#15606)
f89698c11af is described below

commit f89698c11af3429ee69c3a15a8ef729650b6a705
Author: lipenghui <pe...@apache.org>
AuthorDate: Mon May 16 21:04:27 2022 +0800

    [improve][admin] add topic name and sub name for NotFound error message (#15606)
    
    To fix https://issues.apache.org/jira/browse/PULSAR-20
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  12 +++
 .../broker/admin/impl/PersistentTopicsBase.java    | 105 +++++++++++++--------
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |   2 +
 .../broker/admin/AdminApiSubscriptionTest.java     |  12 +--
 .../apache/pulsar/broker/admin/AdminApiTest.java   |  31 +++++-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   3 +-
 .../org/apache/pulsar/broker/admin/TopicsTest.java |   4 +-
 7 files changed, 122 insertions(+), 47 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 7f77568b8bb..43addc30af7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -830,4 +830,16 @@ public abstract class AdminResource extends PulsarWebResource {
                 && ((WebApplicationException) realCause).getResponse().getStatus()
                 == Status.TEMPORARY_REDIRECT.getStatusCode();
     }
+
+    protected static String getTopicNotFoundErrorMessage(String topic) {
+        return String.format("Topic %s not found", topic);
+    }
+
+    protected static String getPartitionedTopicNotFoundErrorMessage(String topic) {
+        return String.format("Partitioned Topic %s not found", topic);
+    }
+
+    protected static String getSubNotFoundErrorMessage(String topic, String subscription) {
+        return String.format("Subscription %s not found for topic %s", subscription, topic);
+    }
 }
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 8b046608306..2249ab9c1b7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -569,7 +569,7 @@ public class PersistentTopicsBase extends AdminResource {
         return pulsar().getNamespaceService().checkTopicExists(topicName)
                 .thenAccept(exist -> {
                     if (!exist) {
-                        throw new RestException(Status.NOT_FOUND, "Topic not exist");
+                        throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
                     }
                 });
     }
@@ -607,7 +607,8 @@ public class PersistentTopicsBase extends AdminResource {
                     } else if (realCause instanceof MetadataStoreException.NotFoundException) {
                         log.warn("Namespace policies of {} not found", topicName.getNamespaceObject());
                         asyncResponse.resume(new RestException(
-                                new RestException(Status.NOT_FOUND, "Partitioned topic does not exist")));
+                                new RestException(Status.NOT_FOUND,
+                                        getPartitionedTopicNotFoundErrorMessage(topicName.toString()))));
                     } else if (realCause instanceof PulsarAdminException) {
                         asyncResponse.resume(new RestException((PulsarAdminException) realCause));
                     } else if (realCause instanceof MetadataStoreException.BadVersionException) {
@@ -1023,7 +1024,7 @@ public class PersistentTopicsBase extends AdminResource {
             if (t instanceof TopicBusyException) {
                 throw new RestException(Status.PRECONDITION_FAILED, "Topic has active producers/subscriptions");
             } else if (isManagedLedgerNotFoundException(e)) {
-                throw new RestException(Status.NOT_FOUND, "Topic not found");
+                throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
             } else {
                 throw new RestException(t);
             }
@@ -1250,7 +1251,8 @@ public class PersistentTopicsBase extends AdminResource {
                             if (exception != null) {
                                 Throwable t = exception.getCause();
                                 if (t instanceof NotFoundException) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getTopicNotFoundErrorMessage(topicName.toString())));
                                 } else {
                                     log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, t);
                                     asyncResponse.resume(new RestException(t));
@@ -1322,7 +1324,8 @@ public class PersistentTopicsBase extends AdminResource {
         future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName,
                 authoritative, false)).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
                 return;
             }
             PartitionedTopicStatsImpl stats = new PartitionedTopicStatsImpl(partitionMetadata);
@@ -1396,7 +1399,8 @@ public class PersistentTopicsBase extends AdminResource {
         future.thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, authoritative, false))
                 .thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions == 0) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Partitioned Topic not found"));
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        getPartitionedTopicNotFoundErrorMessage(topicName.toString())));
                 return;
             }
 
@@ -1484,7 +1488,8 @@ public class PersistentTopicsBase extends AdminResource {
                             if (exception != null) {
                                 Throwable t = exception.getCause();
                                 if (t instanceof NotFoundException) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getSubNotFoundErrorMessage(topicName.toString(), subName)));
                                     return null;
                                 } else if (t instanceof PreconditionFailedException) {
                                     asyncResponse.resume(new RestException(Status.PRECONDITION_FAILED,
@@ -1530,7 +1535,8 @@ public class PersistentTopicsBase extends AdminResource {
                 Topic topic = getTopicReference(topicName);
                 Subscription sub = topic.getSubscription(subName);
                 if (sub == null) {
-                    throw new RestException(Status.NOT_FOUND, "Subscription not found");
+                    throw new RestException(Status.NOT_FOUND,
+                            getSubNotFoundErrorMessage(topicName.toString(), subName));
                 }
                 return sub.delete();
             }).thenRun(() -> {
@@ -1590,7 +1596,8 @@ public class PersistentTopicsBase extends AdminResource {
                             if (exception != null) {
                                 Throwable t = exception.getCause();
                                 if (t instanceof NotFoundException) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getSubNotFoundErrorMessage(topicName.toString(), subName)));
                                     return null;
                                 } else {
                                     log.error("[{}] Failed to delete subscription forcefully {} {}",
@@ -1636,7 +1643,8 @@ public class PersistentTopicsBase extends AdminResource {
                     Topic topic = getTopicReference(topicName);
                     Subscription sub = topic.getSubscription(subName);
                     if (sub == null) {
-                        throw new RestException(Status.NOT_FOUND, "Subscription not found");
+                        throw new RestException(Status.NOT_FOUND,
+                                getSubNotFoundErrorMessage(topicName.toString(), subName));
                     }
                     return sub.deleteForcefully();
                 }).thenRun(() -> {
@@ -1694,7 +1702,8 @@ public class PersistentTopicsBase extends AdminResource {
                                     Throwable t = exception.getCause();
                                     if (t instanceof NotFoundException) {
                                         asyncResponse.resume(
-                                            new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                            new RestException(Status.NOT_FOUND,
+                                                    getSubNotFoundErrorMessage(topicName.toString(), subName)));
                                     } else {
                                         log.error("[{}] Failed to skip all messages {} {}",
                                             clientAppId(), topicName, subName, t);
@@ -1742,14 +1751,16 @@ public class PersistentTopicsBase extends AdminResource {
                         PersistentReplicator repl =
                             (PersistentReplicator) topic.getPersistentReplicator(remoteCluster);
                         if (repl == null) {
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                    getSubNotFoundErrorMessage(topicName.toString(), subName)));
                             return CompletableFuture.completedFuture(null);
                         }
                         return repl.clearBacklog().whenComplete(biConsumer);
                     } else {
                         PersistentSubscription sub = topic.getSubscription(subName);
                         if (sub == null) {
-                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                    getSubNotFoundErrorMessage(topicName.toString(), subName)));
                             return CompletableFuture.completedFuture(null);
                         }
                         return sub.clearBacklog().whenComplete(biConsumer);
@@ -1785,7 +1796,8 @@ public class PersistentTopicsBase extends AdminResource {
                          return getTopicReferenceAsync(topicName).thenCompose(t -> {
                              PersistentTopic topic = (PersistentTopic) t;
                              if (topic == null) {
-                                 throw new RestException(new RestException(Status.NOT_FOUND, "Topic not found"));
+                                 throw new RestException(new RestException(Status.NOT_FOUND,
+                                         getTopicNotFoundErrorMessage(topicName.toString())));
                              }
                              if (subName.startsWith(topic.getReplicatorPrefix())) {
                                  String remoteCluster = PersistentReplicator.getRemoteCluster(subName);
@@ -1805,7 +1817,8 @@ public class PersistentTopicsBase extends AdminResource {
                                  PersistentSubscription sub = topic.getSubscription(subName);
                                  if (sub == null) {
                                      return FutureUtil.failedFuture(
-                                             new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                             new RestException(Status.NOT_FOUND,
+                                                     getSubNotFoundErrorMessage(topicName.toString(), subName)));
                                  }
                                  return sub.skipMessages(numMessages).thenAccept(unused -> {
                                      log.info("[{}] Skipped {} messages on {} {}", clientAppId(), numMessages,
@@ -1906,7 +1919,7 @@ public class PersistentTopicsBase extends AdminResource {
                 .thenCompose(__ -> getTopicReferenceAsync(topicName).thenAccept(t -> {
                      if (t == null) {
                          resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.NOT_FOUND,
-                                 "Topic not found"));
+                                 getTopicNotFoundErrorMessage(topicName.toString())));
                          return;
                      }
                     if (!(t instanceof PersistentTopic)) {
@@ -2073,12 +2086,14 @@ public class PersistentTopicsBase extends AdminResource {
 
             PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
             if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        getTopicNotFoundErrorMessage(topicName.toString())));
                 return;
             }
             PersistentSubscription sub = topic.getSubscription(subName);
             if (sub == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
                 return;
             }
             sub.resetCursor(timestamp).thenRun(() -> {
@@ -2299,12 +2314,14 @@ public class PersistentTopicsBase extends AdminResource {
                         .thenCompose(ignore -> getTopicReferenceAsync(topicName))
                         .thenAccept(topic -> {
                                 if (topic == null) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getTopicNotFoundErrorMessage(topicName.toString())));
                                     return;
                                 }
                                 PersistentSubscription sub = ((PersistentTopic) topic).getSubscription(subName);
                                 if (sub == null) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getSubNotFoundErrorMessage(topicName.toString(), subName)));
                                     return;
                                 }
                                 CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
@@ -2869,7 +2886,7 @@ public class PersistentTopicsBase extends AdminResource {
                                                 messageId.getEntryId());
                                         if (topic == null) {
                                             asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                                                    "Topic not found"));
+                                                    getTopicNotFoundErrorMessage(topicName.toString())));
                                             return;
                                         }
                                         ManagedLedgerImpl managedLedger =
@@ -3267,7 +3284,7 @@ public class PersistentTopicsBase extends AdminResource {
                 .thenCompose(__ -> checkTopicExistsAsync(topicName))
                 .thenCompose(exist -> {
                     if (!exist) {
-                        throw new RestException(Status.NOT_FOUND, "Topic not found");
+                        throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
                     } else {
                         return getPartitionedTopicMetadataAsync(topicName, false, false)
                             .thenCompose(metadata -> {
@@ -3399,7 +3416,8 @@ public class PersistentTopicsBase extends AdminResource {
                             if (exception != null) {
                                 Throwable t = exception.getCause();
                                 if (t instanceof NotFoundException) {
-                                    asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                                    asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                            getTopicNotFoundErrorMessage(topicName.toString())));
                                 } else {
                                     log.error("[{}] Failed to terminate topic {}", clientAppId(), topicName, t);
                                     asyncResponse.resume(new RestException(t));
@@ -3475,7 +3493,8 @@ public class PersistentTopicsBase extends AdminResource {
                                                         Throwable t = exception.getCause();
                                                         if (t instanceof NotFoundException) {
                                                             asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                                                                    "Subscription not found"));
+                                                                    getSubNotFoundErrorMessage(topicName.toString(),
+                                                                            subName)));
                                                             return null;
                                                         } else {
                                                             log.error("[{}] Failed to expire messages up "
@@ -3518,7 +3537,8 @@ public class PersistentTopicsBase extends AdminResource {
             final CompletableFuture<Void> resultFuture = new CompletableFuture<>();
             getTopicReferenceAsync(topicName).thenAccept(t -> {
                  if (t == null) {
-                     resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Topic not found"));
+                     resultFuture.completeExceptionally(new RestException(Status.NOT_FOUND,
+                             getTopicNotFoundErrorMessage(topicName.toString())));
                      return;
                  }
                 if (!(t instanceof PersistentTopic)) {
@@ -3543,7 +3563,8 @@ public class PersistentTopicsBase extends AdminResource {
                     PersistentSubscription sub = topic.getSubscription(subName);
                     if (sub == null) {
                         resultFuture.completeExceptionally(
-                                new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                new RestException(Status.NOT_FOUND,
+                                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
                         return;
                     }
                     issued = sub.expireMessages(expireTimeInSeconds);
@@ -3627,14 +3648,15 @@ public class PersistentTopicsBase extends AdminResource {
         return getTopicReferenceAsync(topicName).thenAccept(t -> {
             PersistentTopic topic = (PersistentTopic) t;
             if (topic == null) {
-                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                        getTopicNotFoundErrorMessage(topicName.toString())));
                 return;
             }
             try {
                 PersistentSubscription sub = topic.getSubscription(subName);
                 if (sub == null) {
                     asyncResponse.resume(new RestException(Status.NOT_FOUND,
-                            "Subscription not found"));
+                            getSubNotFoundErrorMessage(topicName.toString(), subName)));
                     return;
                 }
                 CompletableFuture<Integer> batchSizeFuture = new CompletableFuture<>();
@@ -3954,7 +3976,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     private RestException topicNotFoundReason(TopicName topicName) {
         if (!topicName.isPartitioned()) {
-            return new RestException(Status.NOT_FOUND, "Topic not found");
+            return new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
         }
 
         PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
@@ -3967,12 +3989,13 @@ public class PersistentTopicsBase extends AdminResource {
         } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
             return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
         }
-        return new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
+        return new RestException(Status.NOT_FOUND, getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
     }
 
     private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
         if (!topicName.isPartitioned()) {
-            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND, "Topic not found"));
+            return FutureUtil.failedFuture(new RestException(Status.NOT_FOUND,
+                    getTopicNotFoundErrorMessage(topicName.toString())));
         }
 
         return getPartitionedTopicMetadataAsync(
@@ -3986,7 +4009,8 @@ public class PersistentTopicsBase extends AdminResource {
                     } else if (!internalGetList(Optional.empty()).contains(topicName.toString())) {
                         throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
                     }
-                    throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
+                    throw new RestException(Status.NOT_FOUND,
+                            getPartitionedTopicNotFoundErrorMessage(topicName.toString()));
                 });
     }
 
@@ -4003,7 +4027,7 @@ public class PersistentTopicsBase extends AdminResource {
 
             return checkNotNull(sub);
         } catch (Exception e) {
-            throw new RestException(Status.NOT_FOUND, "Subscription not found");
+            throw new RestException(Status.NOT_FOUND, getSubNotFoundErrorMessage(topicName.toString(), subName));
         }
     }
 
@@ -4274,7 +4298,8 @@ public class PersistentTopicsBase extends AdminResource {
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenAccept(topic -> {
                     if (topic == null) {
-                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                getTopicNotFoundErrorMessage(topicName.toString())));
                         return;
                     }
                     if (!(topic instanceof PersistentTopic)) {
@@ -4787,13 +4812,15 @@ public class PersistentTopicsBase extends AdminResource {
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenAccept(topic -> {
                             if (topic == null) {
-                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                        getTopicNotFoundErrorMessage(topicName.toString())));
                                 return;
                             }
 
                             Subscription sub = topic.getSubscription(subName);
                             if (sub == null) {
-                                asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                                asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                        getSubNotFoundErrorMessage(topicName.toString(), subName)));
                                 return;
                             }
 
@@ -4926,13 +4953,15 @@ public class PersistentTopicsBase extends AdminResource {
                 .thenCompose(__ -> getTopicReferenceAsync(topicName))
                 .thenAccept(topic -> {
                     if (topic == null) {
-                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Topic not found"));
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                getTopicNotFoundErrorMessage(topicName.toString())));
                         return;
                     }
 
                     Subscription sub = topic.getSubscription(subName);
                     if (sub == null) {
-                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Subscription not found"));
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND,
+                                getSubNotFoundErrorMessage(topicName.toString(), subName)));
                         return;
                     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index a1a674eac9c..9c3d2227166 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -579,6 +579,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
             admin.topics().resetCursor(topicName + "invalid", "my-sub", messageId);
             fail("It should have failed due to invalid topic name");
         } catch (PulsarAdminException.NotFoundException e) {
+            assertTrue(e.getMessage().contains(topicName));
             // Ok
         }
 
@@ -587,6 +588,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
             admin.topics().resetCursor(topicName, "invalid-sub", messageId);
             fail("It should have failed due to invalid subscription name");
         } catch (PulsarAdminException.NotFoundException e) {
+            assertTrue(e.getMessage().contains("invalid-sub"));
             // Ok
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
index 8e4a1d54a3e..5d682718c2a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiSubscriptionTest.java
@@ -65,18 +65,18 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testExpireMessageWithNonExistTopicAndNonExistSub() {
         String uuid = UUID.randomUUID().toString();
-        String topic = "test-expire-messages-non-exist-topic-" + uuid;
+        String topic = "persistent://public/default/test-expire-messages-non-exist-topic-" + uuid;
         String subscriptionName = "test-expire-messages-non-exist-sub-" + uuid;
 
         PulsarAdminException exception = expectThrows(PulsarAdminException.class,
                 () -> admin.topics().expireMessages(topic, subscriptionName, 1));
         assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
-        assertEquals(exception.getMessage(), "Topic not found");
+        assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
 
         exception = expectThrows(PulsarAdminException.class,
                 () -> admin.topics().expireMessagesForAllSubscriptions(topic, 1));
         assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
-        assertEquals(exception.getMessage(), "Topic not found");
+        assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
     }
 
     @Test
@@ -93,17 +93,17 @@ public class AdminApiSubscriptionTest extends MockedPulsarServiceBaseTest {
     @Test
     public void tesSkipMessageWithNonExistTopicAndNotExistSub() {
         String uuid = UUID.randomUUID().toString();
-        String topic = "test-skip-messages-non-exist-topic-" + uuid;
+        String topic = "persistent://public/default/test-skip-messages-non-exist-topic-" + uuid;
         String subscriptionName = "test-skip-messages-non-exist-sub-" + uuid;
 
         PulsarAdminException exception = expectThrows(PulsarAdminException.class,
                 () -> admin.topics().skipMessages(topic, subscriptionName, 1));
         assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
-        assertEquals(exception.getMessage(), "Topic not found");
+        assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
 
         exception = expectThrows(PulsarAdminException.class,
                 () -> admin.topics().skipAllMessages(topic, subscriptionName));
         assertEquals(exception.getStatusCode(), Response.Status.NOT_FOUND.getStatusCode());
-        assertEquals(exception.getMessage(), "Topic not found");
+        assertEquals(exception.getMessage(), String.format("Topic %s not found", topic));
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 0579eb20c48..46ad22b15ad 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -906,6 +906,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         try {
             admin.topics().skipAllMessages(persistentTopicName, subName);
         } catch (NotFoundException e) {
+            assertTrue(e.getMessage().contains(subName));
         }
 
         admin.topics().delete(persistentTopicName);
@@ -914,6 +915,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             admin.topics().delete(persistentTopicName);
             fail("Should have received 404");
         } catch (NotFoundException e) {
+            assertTrue(e.getMessage().contains(persistentTopicName));
         }
 
         assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
@@ -1114,6 +1116,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             admin.topics().deletePartitionedTopic(anotherTopic);
             fail("Should have failed as the partitioned topic was not created");
         } catch (NotFoundException nfe) {
+            assertTrue(nfe.getMessage().contains(anotherTopic));
         }
 
         admin.topics().deletePartitionedTopic(partitionedTopicName);
@@ -2125,7 +2128,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
             admin.topics().getStats("persistent://prop-xyz/ns1/ghostTopic");
             fail("The topic doesn't exist");
         } catch (NotFoundException e) {
-            // OK
+            assertTrue(e.getMessage().contains("persistent://prop-xyz/ns1/ghostTopic"));
         }
     }
 
@@ -2215,6 +2218,20 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
         topicName = "persistent://prop-xyz/ns1/" + topicName;
 
+        try {
+            admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+        } catch (PulsarAdminException.NotFoundException e) {
+            assertTrue(e.getMessage().contains(topicName));
+        }
+
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        try {
+            admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+        } catch (PulsarAdminException.NotFoundException e) {
+            assertTrue(e.getMessage().contains("my-sub"));
+        }
+
         // create consumer and subscription
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").startMessageIdInclusive()
@@ -2407,8 +2424,20 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().setRetention("prop-xyz/ns1", new RetentionPolicies(10, 10));
         topicName = "persistent://prop-xyz/ns1/" + topicName;
 
+        try {
+            admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+        } catch (PulsarAdminException.NotFoundException e) {
+            assertTrue(e.getMessage().contains(topicName));
+        }
+
         admin.topics().createPartitionedTopic(topicName, 4);
 
+        try {
+            admin.topics().resetCursor(topicName, "my-sub", System.currentTimeMillis());
+        } catch (PulsarAdminException.NotFoundException e) {
+            assertTrue(e.getMessage().contains("my-sub"));
+        }
+
         // create consumer and subscription
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
                 .subscriptionName("my-sub").startMessageIdInclusive()
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index eac488b381c..632987b69e3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -176,7 +176,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
         Assert.assertEquals(errorCaptor.getValue().getResponse().getStatus(),
                 Response.Status.NOT_FOUND.getStatusCode());
-        Assert.assertEquals(errorCaptor.getValue().getMessage(), "Topic not found");
+        Assert.assertEquals(errorCaptor.getValue().getMessage(), String.format("Topic %s not found",
+                "persistent://my-tenant/my-namespace/topic-not-found"));
 
         // 2) Confirm that the partitioned topic does not exist
         response = mock(AsyncResponse.class);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 35e205f2dc9..3327b83b6ae 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -383,7 +383,9 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
         topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, false, producerMessages);
         ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
         verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
-        Assert.assertTrue(responseCaptor.getValue().getMessage().contains("Topic not exist"));
+        System.out.println(responseCaptor.getValue().getMessage());
+        Assert.assertTrue(responseCaptor.getValue().getMessage()
+                .contains(String.format("Topic %s not found", topicName)));
     }
 
     @Test