You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/05/05 11:26:41 UTC

[pulsar] branch master updated: [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async (#14153)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 942c5c5f4d8 [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async (#14153)
942c5c5f4d8 is described below

commit 942c5c5f4d801d03b63631c6bf0a2e1654ef1c56
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Thu May 5 19:26:31 2022 +0800

    [Broker] Make PersistentTopicsBase#internalGetPartitionedMetadata async (#14153)
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 50 +++-------------
 .../broker/admin/impl/PersistentTopicsBase.java    | 68 ++++++++++++----------
 .../broker/admin/v1/NonPersistentTopics.java       | 21 ++++---
 .../pulsar/broker/admin/v1/PersistentTopics.java   | 15 ++++-
 .../broker/admin/v2/NonPersistentTopics.java       |  7 ++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   | 13 ++++-
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 56 +++++++++++-------
 .../org/apache/pulsar/broker/admin/TopicsTest.java |  2 +-
 .../pulsar/broker/admin/v1/V1_AdminApi2Test.java   |  7 ++-
 9 files changed, 127 insertions(+), 112 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index ef167829f8d..ec502e134f2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -427,21 +427,20 @@ public abstract class AdminResource extends PulsarWebResource {
         this.servletContext = servletContext;
     }
 
+    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
+                                                                   boolean authoritative,
+                                                                   boolean checkAllowAutoCreation) {
+        return sync(() -> getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation));
+    }
+
     protected CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadataAsync(
             TopicName topicName, boolean authoritative, boolean checkAllowAutoCreation) {
-        try {
-            validateClusterOwnership(topicName.getCluster());
-        } catch (Exception e) {
-            return FutureUtil.failedFuture(e);
-        }
-
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
         // producer/consumer
-        return validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject())
-                .thenRun(() -> {
-                    validateTopicOperation(topicName, TopicOperation.LOOKUP);
-                })
+        return validateClusterOwnershipAsync(topicName.getCluster())
+                .thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
+                .thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP))
                 .thenCompose(__ -> {
                     if (checkAllowAutoCreation) {
                         return pulsar().getBrokerService()
@@ -452,37 +451,6 @@ public abstract class AdminResource extends PulsarWebResource {
                 });
     }
 
-    protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
-            boolean authoritative, boolean checkAllowAutoCreation) {
-        validateClusterOwnership(topicName.getCluster());
-        // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
-        // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
-        // producer/consumer
-        validateGlobalNamespaceOwnership(topicName.getNamespaceObject());
-
-        try {
-            validateTopicOperation(topicName, TopicOperation.LOOKUP);
-        } catch (Exception e) {
-            // unknown error marked as internal server error
-            log.warn("Unexpected error while authorizing lookup. topic={}, role={}. Error: {}", topicName,
-                    clientAppId(), e.getMessage(), e);
-            throw new RestException(e);
-        }
-
-        PartitionedTopicMetadata partitionMetadata;
-        if (checkAllowAutoCreation) {
-            partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), topicName);
-        } else {
-            partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
-        }
-
-        if (log.isDebugEnabled()) {
-            log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), topicName,
-                    partitionMetadata.partitions);
-        }
-        return partitionMetadata;
-    }
-
     protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
         try {
             return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index cf1b3d71528..aa3fa6011dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -546,27 +546,37 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative,
                                                                       boolean checkAllowAutoCreation) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
-                authoritative, checkAllowAutoCreation);
-        if (metadata.partitions == 0 && !checkAllowAutoCreation) {
-            // The topic may be a non-partitioned topic, so check if it exists here.
-            // However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
-            // In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
-            // shouldn't check if the topic exists.
-            try {
-                if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
-                    throw new RestException(Status.NOT_FOUND,
-                            new PulsarClientException.NotFoundException("Topic not exist"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                log.error("Failed to check if topic '{}' exists", topicName, e);
-                throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
-            }
-        }
-        if (metadata.partitions > 1) {
-            validateClientVersion();
-        }
-        return metadata;
+        return sync(() -> internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation));
+    }
+
+    protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMetadataAsync(
+                                                                          boolean authoritative,
+                                                                          boolean checkAllowAutoCreation) {
+        return getPartitionedTopicMetadataAsync(topicName, authoritative, checkAllowAutoCreation)
+                .thenCompose(metadata -> {
+                    CompletableFuture<Void> ret;
+                    if (metadata.partitions == 0 && !checkAllowAutoCreation) {
+                        // The topic may be a non-partitioned topic, so check if it exists here.
+                        // However, when checkAllowAutoCreation is true, the client will create the topic if
+                        // it doesn't exist. In this case, `partitions == 0` means the automatically created topic
+                        // is a non-partitioned topic so we shouldn't check if the topic exists.
+                        ret = internalCheckTopicExists(topicName);
+                    } else if (metadata.partitions > 1) {
+                        ret = internalValidateClientVersionAsync();
+                    } else {
+                        ret = CompletableFuture.completedFuture(null);
+                    }
+                    return ret.thenApply(__ -> metadata);
+                });
+    }
+
+    protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
+        return pulsar().getNamespaceService().checkTopicExists(topicName)
+                .thenAccept(exist -> {
+                    if (!exist) {
+                        throw new RestException(Status.NOT_FOUND, "Topic not exist");
+                    }
+                });
     }
 
     protected void internalDeletePartitionedTopic(AsyncResponse asyncResponse,
@@ -4135,15 +4145,15 @@ public class PersistentTopicsBase extends AdminResource {
     // Pulsar client-java lib always passes user-agent as X-Java-$version.
     // However, cpp-client older than v1.20 (PR #765) never used to pass it.
     // So, request without user-agent and Pulsar-CPP-vX (X < 1.21) must be rejected
-    private void validateClientVersion() {
+    protected CompletableFuture<Void> internalValidateClientVersionAsync() {
         if (!pulsar().getConfiguration().isClientLibraryVersionCheckEnabled()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
         final String userAgent = httpRequest.getHeader("User-Agent");
         if (StringUtils.isBlank(userAgent)) {
-            throw new RestException(Status.METHOD_NOT_ALLOWED,
+            return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
                     "Client lib is not compatible to"
-                            + " access partitioned metadata: version in user-agent is not present");
+                            + " access partitioned metadata: version in user-agent is not present"));
         }
         // Version < 1.20 for cpp-client is not allowed
         if (userAgent.contains(DEPRECATED_CLIENT_VERSION_PREFIX)) {
@@ -4154,18 +4164,16 @@ public class PersistentTopicsBase extends AdminResource {
                 if (splits != null && splits.length > 1) {
                     if (LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMajorVersion() > Integer.parseInt(splits[0])
                             || LEAST_SUPPORTED_CLIENT_VERSION_PREFIX.getMinorVersion() > Integer.parseInt(splits[1])) {
-                        throw new RestException(Status.METHOD_NOT_ALLOWED,
+                        return FutureUtil.failedFuture(new RestException(Status.METHOD_NOT_ALLOWED,
                                 "Client lib is not compatible to access partitioned metadata: version " + userAgent
-                                        + " is not supported");
+                                        + " is not supported"));
                     }
                 }
-            } catch (RestException re) {
-                throw re;
             } catch (Exception e) {
                 log.warn("[{}] Failed to parse version {} ", clientAppId(), userAgent);
             }
         }
-        return;
+        return CompletableFuture.completedFuture(null);
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index dc466be86d6..93e49dc70b1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -51,7 +51,6 @@ import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -77,16 +76,16 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission")})
-    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
-                                                           @PathParam("cluster") String cluster,
-                                                           @PathParam("namespace") String namespace,
-                                                           @PathParam("topic") @Encoded String encodedTopic,
-                                                           @QueryParam("authoritative") @DefaultValue("false")
-                                                                       boolean authoritative,
-                                                           @QueryParam("checkAllowAutoCreation") @DefaultValue("false")
-                                                                       boolean checkAllowAutoCreation) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
+            @PathParam("cluster") String cluster,
+            @PathParam("namespace") String namespace,
+            @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
+        super.getPartitionedMetadata(asyncResponse, property, cluster, namespace, encodedTopic, authoritative,
+                checkAllowAutoCreation);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 8e5107a6bd8..bb92498b2e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -48,7 +48,6 @@ import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.MessageIdImpl;
 import org.apache.pulsar.client.impl.ResetCursorData;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -283,13 +282,23 @@ public class PersistentTopics extends PersistentTopicsBase {
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission") })
-    public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
+            @PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
+        internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get partitioned metadata topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 66867b68426..5cf78278350 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -54,7 +54,6 @@ import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.NamespaceOperation;
 import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
@@ -88,7 +87,8 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 500, message = "Internal server error"),
             @ApiResponse(code = 503, message = "Failed to validate cluster configuration")
     })
-    public PartitionedTopicMetadata getPartitionedMetadata(
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -99,7 +99,8 @@ public class NonPersistentTopics extends PersistentTopics {
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Is check configuration required to automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
-        return super.getPartitionedMetadata(tenant, namespace, encodedTopic, authoritative, checkAllowAutoCreation);
+        super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative,
+                checkAllowAutoCreation);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index a70bf7f58ea..4095530340c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -881,7 +881,8 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 412, message = "Partitioned topic name is invalid"),
             @ApiResponse(code = 500, message = "Internal server error")
     })
-    public PartitionedTopicMetadata getPartitionedMetadata(
+    public void getPartitionedMetadata(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -893,7 +894,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Is check configuration required to automatically create topic")
             @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
+        internalGetPartitionedMetadataAsync(authoritative, checkAllowAutoCreation)
+                .thenAccept(asyncResponse::resume)
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to get partitioned metadata topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 382e90e9321..b1bc4ab5486 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -405,58 +405,74 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
         Assert.assertTrue(errorCaptor.getValue().getMessage().contains("zero partitions"));
-
+        response = mock(AsyncResponse.class);
         final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true, null);
-        Assert.assertEquals(persistentTopics
-                        .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, false).partitions,
-                0);
-
-        Assert.assertEquals(persistentTopics
-                        .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, true).partitions,
-                0);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, false);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
+        Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
+        response = mock(AsyncResponse.class);
+        responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, true);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
+        Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
     }
 
     @Test
     public void testCreateNonPartitionedTopic() {
         final String topicName = "standard-topic-partition-a";
+        AsyncResponse response = mock(AsyncResponse.class);
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
-        PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
+        persistentTopics.getPartitionedMetadata(response,
                 testTenant, testNamespace, topicName, true, false);
-        Assert.assertEquals(pMetadata.partitions, 0);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().partitions, 0);
 
-        PartitionedTopicMetadata metadata = persistentTopics.getPartitionedMetadata(
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response,
                 testTenant, testNamespace, topicName, true, true);
-        Assert.assertEquals(metadata.partitions, 0);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().partitions, 0);
         final String topicName2 = "standard-topic-partition-b";
         Map<String, String> topicMetadata = Maps.newHashMap();
         topicMetadata.put("key1", "value1");
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName2, true, topicMetadata);
-        PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(response,
                 testTenant, testNamespace, topicName2, true, false);
-        Assert.assertNull(pMetadata2.properties);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertNull(responseCaptor.getValue().properties);
     }
 
     @Test
     public void testCreatePartitionedTopic() {
         AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
         final String topicName = "standard-partitioned-topic-a";
         persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 2, true);
         Awaitility.await().untilAsserted(() -> {
-            PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
+            persistentTopics.getPartitionedMetadata(response,
                     testTenant, testNamespace, topicName, true, false);
-            Assert.assertNull(pMetadata.properties);
+            verify(response, timeout(5000).atLeast(1)).resume(responseCaptor.capture());
+            Assert.assertNull(responseCaptor.getValue().properties);
         });
+        AsyncResponse response2 = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
         final String topicName2 = "standard-partitioned-topic-b";
         Map<String, String> topicMetadata = Maps.newHashMap();
         topicMetadata.put("key1", "value1");
         PartitionedTopicMetadata metadata = new PartitionedTopicMetadata(2, topicMetadata);
-        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName2, metadata, true);
+        persistentTopics.createPartitionedTopic(response2, testTenant, testNamespace, topicName2, metadata, true);
         Awaitility.await().untilAsserted(() -> {
-            PartitionedTopicMetadata pMetadata2 = persistentTopics.getPartitionedMetadata(
+            persistentTopics.getPartitionedMetadata(response2,
                     testTenant, testNamespace, topicName2, true, false);
-            Assert.assertEquals(pMetadata2.properties.size(), 1);
-            Assert.assertEquals(pMetadata2.properties, topicMetadata);
+            verify(response2, timeout(5000).atLeast(1)).resume(responseCaptor2.capture());
+            Assert.assertEquals(responseCaptor2.getValue().properties.size(), 1);
+            Assert.assertEquals(responseCaptor2.getValue().properties, topicMetadata);
         });
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
index 4edfbfd17db..82d7571912b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java
@@ -383,7 +383,7 @@ public class TopicsTest extends MockedPulsarServiceBaseTest {
         topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName, false, producerMessages);
         ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
         verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
-        Assert.assertEquals(responseCaptor.getValue().getMessage(), "Fail to publish message: Topic not exist");
+        Assert.assertTrue(responseCaptor.getValue().getMessage().contains("Topic not exist"));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
index c74d5ee7733..f62130ec381 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
@@ -256,7 +256,12 @@ public class V1_AdminApi2Test extends MockedPulsarServiceBaseTest {
 
         // test partitioned-topic
         final String partitionedTopicName = "non-persistent://prop-xyz/use/ns1/paritioned";
-        assertEquals(admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0);
+        try {
+            admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName);
+            fail("Should have failed");
+        } catch (Exception ex) {
+            assertTrue(ex instanceof PulsarAdminException.NotFoundException);
+        }
         admin.nonPersistentTopics().createPartitionedTopic(partitionedTopicName, 5);
         assertEquals(admin.nonPersistentTopics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 5);
     }