You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/05/09 01:52:01 UTC

[pulsar] branch master updated: [Broker] Make PersistentTopicsBase#internalCreateNonPartitionedTopic async. (#14030)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f53e8eedc8 [Broker] Make PersistentTopicsBase#internalCreateNonPartitionedTopic async. (#14030)
4f53e8eedc8 is described below

commit 4f53e8eedc8aca406f60ab4c1f7bc11577a6bf20
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Mon May 9 09:51:56 2022 +0800

    [Broker] Make PersistentTopicsBase#internalCreateNonPartitionedTopic async. (#14030)
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 108 +++++++--------
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  12 +-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  11 +-
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   6 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java |   1 -
 .../pulsar/broker/admin/PersistentTopicsTest.java  | 145 ++++++++++++---------
 .../pulsar/broker/admin/TopicPoliciesTest.java     |   1 -
 .../BrokerServiceAutoTopicCreationTest.java        |   1 +
 8 files changed, 162 insertions(+), 123 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 033aa41cb52..c2a89721412 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -388,37 +388,31 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
-    protected void internalCreateNonPartitionedTopic(boolean authoritative, Map<String, String> properties) {
-        validateNonPartitionTopicName(topicName.getLocalName());
+    protected CompletableFuture<Void> internalCreateNonPartitionedTopicAsync(boolean authoritative,
+                                                     Map<String, String> properties) {
+        CompletableFuture<Void> ret = validateNonPartitionTopicNameAsync(topicName.getLocalName());
         if (topicName.isGlobal()) {
-            validateGlobalNamespaceOwnership(namespaceName);
-        }
-        validateTopicOwnership(topicName, authoritative);
-        validateNamespaceOperation(topicName.getNamespaceObject(), NamespaceOperation.CREATE_TOPIC);
-
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
-        if (partitionMetadata.partitions > 0) {
-            log.warn("[{}] Partitioned topic with the same name already exists {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "This topic already exists");
-        }
-
-        try {
-            Optional<Topic> existedTopic = pulsar().getBrokerService().getTopicIfExists(topicName.toString()).get();
-            if (existedTopic.isPresent()) {
-                log.error("[{}] Topic {} already exists", clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already exists");
-            }
-
-            Topic createdTopic = getOrCreateTopic(topicName, properties);
-            log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), createdTopic);
-        } catch (Exception e) {
-            if (e instanceof RestException) {
-                throw (RestException) e;
-            } else {
-                log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, e);
-                throw new RestException(e);
-            }
-        }
+            ret = ret.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(namespaceName));
+        }
+        return ret.thenCompose(__ -> validateTopicOwnershipAsync(topicName, authoritative))
+           .thenCompose(__ -> validateNamespaceOperationAsync(topicName.getNamespaceObject(),
+                   NamespaceOperation.CREATE_TOPIC))
+           .thenCompose(__ -> getPartitionedTopicMetadataAsync(topicName, false, false))
+           .thenAccept(partitionMetadata -> {
+               if (partitionMetadata.partitions > 0) {
+                   log.warn("[{}] Partitioned topic with the same name already exists {}", clientAppId(), topicName);
+                   throw new RestException(Status.CONFLICT, "This topic already exists");
+               }
+           })
+           .thenCompose(__ -> pulsar().getBrokerService().getTopicIfExists(topicName.toString()))
+           .thenCompose(existedTopic -> {
+               if (existedTopic.isPresent()) {
+                   log.error("[{}] Topic {} already exists", clientAppId(), topicName);
+                   throw new RestException(Status.CONFLICT, "This topic already exists");
+               }
+               return pulsar().getBrokerService().getTopic(topicName.toString(), true, properties);
+           })
+           .thenAccept(__ -> log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName));
     }
 
     /**
@@ -3995,11 +3989,6 @@ public class PersistentTopicsBase extends AdminResource {
                 });
     }
 
-    private Topic getOrCreateTopic(TopicName topicName, Map<String, String> properties) {
-        return pulsar().getBrokerService().getTopic(topicName.toString(), true, properties)
-                .thenApply(Optional::get).join();
-    }
-
     /**
      * Get the Subscription object reference from the Topic reference.
      */
@@ -4236,7 +4225,8 @@ public class PersistentTopicsBase extends AdminResource {
      *
      * @param topicName
      */
-    private void validateNonPartitionTopicName(String topicName) {
+    private CompletableFuture<Void> validateNonPartitionTopicNameAsync(String topicName) {
+        CompletableFuture<Void> ret = CompletableFuture.completedFuture(null);
         if (topicName.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
             try {
                 // First check if what's after suffix "-partition-" is number or not, if not number then can create.
@@ -4245,34 +4235,36 @@ public class PersistentTopicsBase extends AdminResource {
                         + TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
                 TopicName partitionTopicName = TopicName.get(domain(),
                         namespaceName, topicName.substring(0, partitionIndex));
-                PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
-
-                // Partition topic index is 0 to (number of partition - 1)
-                if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
-                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
-                                    + " a number smaller then number of partition of partitioned topic {}.",
-                            clientAppId(), topicName, partitionTopicName.getLocalName());
-                    throw new RestException(Status.PRECONDITION_FAILED,
-                            "Can't create topic " + topicName + " with \"-partition-\" followed by"
-                                    + " a number smaller then number of partition of partitioned topic "
-                                    + partitionTopicName.getLocalName());
-                } else if (metadata.partitions == 0) {
-                    log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
-                                    + " numeric value if there isn't a partitioned topic {} created.",
-                            clientAppId(), topicName, partitionTopicName.getLocalName());
-                    throw new RestException(Status.PRECONDITION_FAILED,
-                            "Can't create topic " + topicName + " with \"-partition-\" followed by"
-                                    + " numeric value if there isn't a partitioned topic "
-                                    + partitionTopicName.getLocalName() + " created.");
-                }
-                // If there is a  partitioned topic with the same name and numeric suffix is smaller than the
-                // number of partition for that partitioned topic, validation will pass.
+                ret = getPartitionedTopicMetadataAsync(partitionTopicName, false, false)
+                        .thenAccept(metadata -> {
+                            // Partition topic index is 0 to (number of partition - 1)
+                            if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
+                                log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+                                                + " a number smaller then number of partition of partitioned topic {}.",
+                                        clientAppId(), topicName, partitionTopicName.getLocalName());
+                                throw new RestException(Status.PRECONDITION_FAILED,
+                                        "Can't create topic " + topicName + " with \"-partition-\" followed by"
+                                                + " a number smaller then number of partition of partitioned topic "
+                                                + partitionTopicName.getLocalName());
+                            } else if (metadata.partitions == 0) {
+                                log.warn("[{}] Can't create topic {} with \"-partition-\" followed by"
+                                                + " numeric value if there isn't a partitioned topic {} created.",
+                                        clientAppId(), topicName, partitionTopicName.getLocalName());
+                                throw new RestException(Status.PRECONDITION_FAILED,
+                                        "Can't create topic " + topicName + " with \"-partition-\" followed by"
+                                                + " numeric value if there isn't a partitioned topic "
+                                                + partitionTopicName.getLocalName() + " created.");
+                            }
+                            // If there is a  partitioned topic with the same name and numeric suffix is smaller
+                            // than the number of partition for that partitioned topic, validation will pass.
+                        });
             } catch (NumberFormatException e) {
                 // Do nothing, if value after partition suffix is not pure numeric value,
                 // as it can't conflict if user want to create partitioned topic with same
                 // topic name prefix in the future.
             }
         }
+        return ret;
     }
 
     protected void internalGetLastMessageId(AsyncResponse asyncResponse, boolean authoritative) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index bb92498b2e7..a7ae878013b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -215,6 +215,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
     })
     public void createNonPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the cluster", required = true)
@@ -228,7 +229,16 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateNamespaceName(tenant, cluster, namespace);
         validateTopicName(tenant, cluster, namespace, encodedTopic);
         validateGlobalNamespaceOwnership();
-        internalCreateNonPartitionedTopic(authoritative, null);
+        validateCreateTopic(topicName);
+        internalCreateNonPartitionedTopicAsync(authoritative, null)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 953b0f5f821..3f92eadacdf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -334,6 +334,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
     })
     public void createNonPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -348,7 +349,15 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateGlobalNamespaceOwnership();
         validateTopicName(tenant, namespace, encodedTopic);
         validateCreateTopic(topicName);
-        internalCreateNonPartitionedTopic(authoritative, properties);
+        internalCreateNonPartitionedTopicAsync(authoritative, properties)
+                .thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
+                .exceptionally(ex -> {
+                    if (!isRedirectException(ex)) {
+                        log.error("[{}] Failed to create non-partitioned topic {}", clientAppId(), topicName, ex);
+                    }
+                    resumeAsyncResponseExceptionally(asyncResponse, ex);
+                    return null;
+                });
     }
 
     @GET
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index 64de71ac488..0579eb20c48 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -2709,10 +2709,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testNamespaceNotExist() {
-        final String nonPartitionedtopic = "persistent://prop-xyz/no-exist/non-partitioned-topic";
+        final String nonPartitionedTopic = "persistent://prop-xyz/no-exist/non-partitioned-topic";
         try {
-            admin.topics().createNonPartitionedTopic(nonPartitionedtopic);
-            fail("should falied for namespaces not exist");
+            admin.topics().createNonPartitionedTopic(nonPartitionedTopic);
+            fail("should failed for namespaces not exist");
         } catch (Exception e) {
             assertTrue(e instanceof NotFoundException);
             assertTrue(e.getMessage().equals("Namespace not found"));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index a334fceed76..0eb87491a52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1416,7 +1416,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster));
 
         admin.topics().createNonPartitionedTopic(topic);
-
         admin.topics().delete(topic);
 
         try {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 330d945f651..5a9f4642791 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -51,6 +51,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.UriInfo;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.BrokerTestUtil;
 import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
 import org.apache.pulsar.broker.admin.v2.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -368,10 +369,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         String testLocalTopicName = "topic-not-found";
 
         // 1) Create the nonPartitionTopic topic
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, testLocalTopicName, true, null);
+        AsyncResponse response  = mock(AsyncResponse.class);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, true, null);
 
         // 2) Create a subscription
-        AsyncResponse response  = mock(AsyncResponse.class);
         persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
                 new ResetCursorData(MessageId.earliest), false);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
@@ -394,7 +395,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testNonPartitionedTopics() {
-        final String nonPartitionTopic = "non-partitioned-topic";
+        final String nonPartitionTopic = BrokerTestUtil.newUniqueName("non-partitioned-topic");
         AsyncResponse response = mock(AsyncResponse.class);
         persistentTopics.createSubscription(response, testTenant, testNamespace, nonPartitionTopic, "test", true,
                 new ResetCursorData(MessageId.latest), false);
@@ -403,52 +404,69 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
         persistentTopics.getSubscriptions(response, testTenant, testNamespace, nonPartitionTopic + "-partition-0",
                 true);
-        ArgumentCaptor<RestException> errorCaptor = ArgumentCaptor.forClass(RestException.class);
         verify(response, timeout(5000).times(1)).resume(errorCaptor.capture());
         Assert.assertTrue(errorCaptor.getValue().getMessage().contains("zero partitions"));
         response = mock(AsyncResponse.class);
-        final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true, null);
-        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, false);
-        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
-        verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
-        Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
-        response = mock(AsyncResponse.class);
-        responseCaptor2 = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
-        persistentTopics.getPartitionedMetadata(response, testTenant, testNamespace, nonPartitionTopic, true, true);
-        verify(response, timeout(5000).times(1)).resume(responseCaptor2.capture());
-        Assert.assertEquals(responseCaptor2.getValue().partitions, 0);
+        final String nonPartitionTopic2 = BrokerTestUtil.newUniqueName("secondary-non-partitioned-topic");
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, nonPartitionTopic2, true, null);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace)
+                    .contains("persistent://" + testTenant + "/" + testNamespace + "/" + nonPartitionTopic2));
+        });
+
+        AsyncResponse metaResponse = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, nonPartitionTopic, true, false);
+        verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+        Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
+
+        metaResponse = mock(AsyncResponse.class);
+        metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, nonPartitionTopic, true, true);
+        verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+        Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
     }
 
     @Test
     public void testCreateNonPartitionedTopic() {
-        final String topicName = "standard-topic-partition-a";
+        final String topic = "standard-topic-partition-a";
+        TopicName topicName = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic);
         AsyncResponse response = mock(AsyncResponse.class);
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
-        persistentTopics.getPartitionedMetadata(response,
-                testTenant, testNamespace, topicName, true, false);
-        ArgumentCaptor<PartitionedTopicMetadata> responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
-        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
-        Assert.assertEquals(responseCaptor.getValue().partitions, 0);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic, true, null);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace).contains(topicName.toString()));
+        });
+        AsyncResponse metaResponse = mock(AsyncResponse.class);
+        ArgumentCaptor<PartitionedTopicMetadata> metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(metaResponse, testTenant, testNamespace, topic, true, false);
+        verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+        Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
+
+        metaResponse = mock(AsyncResponse.class);
+        metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        persistentTopics.getPartitionedMetadata(metaResponse,
+                testTenant, testNamespace, topic, true, true);
+        verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+        Assert.assertEquals(metaResponseCaptor.getValue().partitions, 0);
 
         response = mock(AsyncResponse.class);
-        responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
-        persistentTopics.getPartitionedMetadata(response,
-                testTenant, testNamespace, topicName, true, true);
-        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
-        Assert.assertEquals(responseCaptor.getValue().partitions, 0);
-        final String topicName2 = "standard-topic-partition-b";
+        metaResponse = mock(AsyncResponse.class);
+        metaResponseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
+        final String topic2 = "standard-topic-partition-b";
+        TopicName topicName2 = TopicName.get(TopicDomain.persistent.value(), testTenant, testNamespace, topic2);
         Map<String, String> topicMetadata = Maps.newHashMap();
         topicMetadata.put("key1", "value1");
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName2, true, topicMetadata);
-        response = mock(AsyncResponse.class);
-        responseCaptor = ArgumentCaptor.forClass(PartitionedTopicMetadata.class);
-        persistentTopics.getPartitionedMetadata(response,
-                testTenant, testNamespace, topicName2, true, false);
-        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
-        Assert.assertNull(responseCaptor.getValue().properties);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topic2, true, topicMetadata);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertTrue(admin.topics().getList(testTenant + "/" + testNamespace).contains(topicName2.toString()));
+        });
+        persistentTopics.getPartitionedMetadata(metaResponse,
+                testTenant, testNamespace, topic2, true, false);
+        verify(metaResponse, timeout(5000).times(1)).resume(metaResponseCaptor.capture());
+        Assert.assertNull(metaResponseCaptor.getValue().properties);
     }
 
     @Test
@@ -506,15 +524,19 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         Assert.assertTrue(errCaptor.getValue().getMessage().contains("Namespace not found"));
     }
 
-    @Test(expectedExceptions = RestException.class)
+    @Test
     public void testCreateNonPartitionedTopicWithInvalidName() {
         final String topicName = "standard-topic-partition-10";
         doAnswer(invocation -> {
-            TopicName partitionedTopicname = invocation.getArgument(0, TopicName.class);
-            assert(partitionedTopicname.getLocalName().equals("standard-topic"));
+            TopicName partitionedTopicName = invocation.getArgument(0, TopicName.class);
+            assert(partitionedTopicName.getLocalName().equals("standard-topic"));
             return new PartitionedTopicMetadata(10);
         }).when(persistentTopics).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean());
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+        final AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode());
     }
 
     @Test
@@ -580,7 +602,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // 2) create non partitioned topic and unload
         response = mock(AsyncResponse.class);
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
         persistentTopics.unloadTopic(response, testTenant, testNamespace, topicName, true);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -675,8 +697,16 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         topics = (List<String>) responseCaptor.getValue();
         Assert.assertEquals(topics.size(), 2);
 
-        nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "test-topic-2", false, null);
-        nonPersistentTopic.createNonPartitionedTopic(testTenant, testNamespace, "__change_events", false, null);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.createNonPartitionedTopic(response, testTenant, testNamespace, "test-topic-2", false, null);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.createNonPartitionedTopic(response, testTenant, testNamespace, "__change_events", false, null);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         response = mock(AsyncResponse.class);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
@@ -696,11 +726,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testGrantNonPartitionedTopic() {
         final String topicName = "non-partitioned-topic";
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        AsyncResponse response = mock(AsyncResponse.class);
+        response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -720,14 +751,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, topicName, 3, true);
 
         final String partitionName = TopicName.get(topicName).getPartition(0).getLocalName();
-        try {
-            persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, partitionName, false, null);
-            Assert.fail();
-        } catch (RestException e) {
-            log.error("Failed to create {}: {}", partitionName, e.getMessage());
-            Assert.assertEquals(e.getResponse().getStatus(), 409);
-            Assert.assertEquals(e.getMessage(), "This topic already exists");
-        }
+        ArgumentCaptor<RestException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, partitionName, false, null);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
     }
 
     @Test
@@ -773,11 +800,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testRevokeNonPartitionedTopic() {
         final String topicName = "non-partitioned-topic";
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true, null);
+        AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
-        AsyncResponse response = mock(AsyncResponse.class);
+        response = mock(AsyncResponse.class);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         persistentTopics.grantPermissionsOnTopic(response, testTenant, testNamespace, topicName, role, expectActions);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -852,7 +880,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // create non partitioned topic and compaction on it
         response = mock(AsyncResponse.class);
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopicName, true, null);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, nonPartitionTopicName, true, null);
         persistentTopics.compact(response, testTenant, testNamespace, nonPartitionTopicName, true);
         ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -1030,9 +1058,9 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testOffloadWithNullMessageId() {
         final String topicName = "topic-123";
-        persistentTopics.createNonPartitionedTopic(
-                testTenant, testNamespace, topicName, true, null);
         AsyncResponse response = mock(AsyncResponse.class);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, true, null);
+        response = mock(AsyncResponse.class);
         persistentTopics.triggerOffload(
                 response, testTenant, testNamespace, topicName, true, null);
         ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
@@ -1303,9 +1331,10 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     @Test
     public void testDeleteTopic() throws Exception {
         final String topicName = "topic-1";
+        AsyncResponse response = mock(AsyncResponse.class);
         BrokerService brokerService = spy(pulsar.getBrokerService());
         doReturn(brokerService).when(pulsar).getBrokerService();
-        persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, false, null);
+        persistentTopics.createNonPartitionedTopic(response, testTenant, testNamespace, topicName, false, null);
         CompletableFuture<Void> deleteTopicFuture = new CompletableFuture<>();
         deleteTopicFuture.completeExceptionally(new MetadataStoreException.NotFoundException());
         doReturn(deleteTopicFuture).when(brokerService).deleteTopic(anyString(), anyBoolean());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 4d6a0a7b15e..bbc625b0763 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1289,7 +1289,6 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
     public void testGetSetSubscriptionDispatchRate() throws Exception {
         final String topic = testTopic + UUID.randomUUID();
         admin.topics().createNonPartitionedTopic(topic);
-
         DispatchRate dispatchRate = DispatchRate.builder()
                 .dispatchThrottlingRateInMsg(1000)
                 .dispatchThrottlingRateInByte(1024 * 1024)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 936e3e0d263..6ffdf6d2188 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -142,6 +142,7 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
             assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
         }
         assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        admin.topics().delete(topicString, true);
     }
 
     /**