You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/03/06 08:50:23 UTC

[pulsar] branch master updated: Fix create partitioned topic with a substring of an existing topic name. (#6478)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 19ccfd5  Fix create partitioned topic with a substring of an existing topic name. (#6478)
19ccfd5 is described below

commit 19ccfd5c60020a32bceeca128a9846ca006f0dc7
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Mar 6 16:50:10 2020 +0800

    Fix create partitioned topic with a substring of an existing topic name. (#6478)
    
    Fixes #6468
    
    Fix create a partitioned topic with a substring of an existing topic name. And make create partitioned topic async.
---
 .../apache/pulsar/broker/admin/AdminResource.java  | 114 ++++++++++++++++++++-
 .../broker/admin/impl/PersistentTopicsBase.java    |  97 +++---------------
 .../broker/admin/v1/NonPersistentTopics.java       |  34 +-----
 .../pulsar/broker/admin/v1/PersistentTopics.java   |  16 ++-
 .../broker/admin/v2/NonPersistentTopics.java       |  37 ++-----
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  29 ++++--
 .../apache/pulsar/broker/admin/AdminApiTest.java   |   8 +-
 .../org/apache/pulsar/broker/admin/AdminTest.java  |   6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |  57 ++++++++---
 .../pulsar/broker/admin/v1/V1_AdminApiTest.java    |   4 +-
 10 files changed, 227 insertions(+), 175 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 722da4f..a216982 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Lists;
 
 import java.net.MalformedURLException;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -36,6 +37,7 @@ import java.util.stream.Collectors;
 
 import javax.servlet.ServletContext;
 import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
@@ -46,6 +48,7 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
@@ -255,16 +258,19 @@ public abstract class AdminResource extends PulsarWebResource {
         return namespaces;
     }
 
-    protected void tryCreatePartitionsAsync(int numPartitions) {
+    protected CompletableFuture<Void> tryCreatePartitionsAsync(int numPartitions) {
         if (!topicName.isPersistent()) {
-            return;
+            return CompletableFuture.completedFuture(null);
         }
+        List<CompletableFuture<Void>> futures = new ArrayList<>(numPartitions);
         for (int i = 0; i < numPartitions; i++) {
-            tryCreatePartitionAsync(i);
+            futures.add(tryCreatePartitionAsync(i, null));
         }
+        return FutureUtil.waitForAll(futures);
     }
 
-    private void tryCreatePartitionAsync(final int partition) {
+    private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, CompletableFuture<Void> reuseFuture) {
+        CompletableFuture<Void> result = reuseFuture == null ? new CompletableFuture<>() : reuseFuture;
         zkCreateOptimisticAsync(localZk(), ZkAdminPaths.managedLedgerPath(topicName.getPartition(partition)), new byte[0],
             (rc, s, o, s1) -> {
                 if (KeeperException.Code.OK.intValue() == rc) {
@@ -272,18 +278,22 @@ public abstract class AdminResource extends PulsarWebResource {
                         log.debug("[{}] Topic partition {} created.", clientAppId(),
                             topicName.getPartition(partition));
                     }
+                    result.complete(null);
                 } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
                     log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
                         topicName.getPartition(partition));
+                    result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
                 } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
                     log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
                             clientAppId(), topicName.getPartition(partition));
-                    tryCreatePartitionAsync(partition);
+                    tryCreatePartitionAsync(partition, result);
                 } else {
                     log.error("[{}] Fail to create topic partition {}", clientAppId(),
                         topicName.getPartition(partition), KeeperException.create(KeeperException.Code.get(rc)));
+                    result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
                 }
         });
+        return result;
     }
 
     protected NamespaceName namespaceName;
@@ -707,4 +717,98 @@ public abstract class AdminResource extends PulsarWebResource {
         partitionedTopics.sort(null);
         return partitionedTopics;
     }
+
+    protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int numPartitions) {
+        try {
+            validateAdminAccessForTenant(topicName.getTenant());
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return;
+        }
+        if (numPartitions <= 0) {
+            asyncResponse.resume(new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"));
+            return;
+        }
+        checkTopicExistsAsync(topicName).thenAccept(exists -> {
+            if (exists) {
+                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+                asyncResponse.resume(new RestException(Status.CONFLICT, "This topic already exists"));
+            } else {
+                try {
+                    String path = ZkAdminPaths.partitionedTopicPath(topicName);
+                    byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
+                    zkCreateOptimisticAsync(globalZk(), path, data, (rc, s, o, s1) -> {
+                        if (KeeperException.Code.OK.intValue() == rc) {
+                            globalZk().sync(path, (rc2, s2, ctx) -> {
+                                if (KeeperException.Code.OK.intValue() == rc2) {
+                                    log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
+                                    tryCreatePartitionsAsync(numPartitions).thenAccept(v -> {
+                                        log.info("[{}] Successfully created partitions for topic {}", clientAppId(), topicName);
+                                        asyncResponse.resume(Response.noContent().build());
+                                    }).exceptionally(e -> {
+                                        log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+                                        // The partitioned topic is created but there are some partitions create failed
+                                        asyncResponse.resume(new RestException(e));
+                                        return null;
+                                    });
+                                } else {
+                                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc2)));
+                                    asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc2))));
+                                }
+                            }, null);
+                        } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
+                            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Partitioned topic already exists"));
+                        } else if (KeeperException.Code.BADVERSION.intValue() == rc) {
+                            log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
+                                    topicName);
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                        } else {
+                            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, KeeperException.create(KeeperException.Code.get(rc)));
+                            asyncResponse.resume(new RestException(KeeperException.create(KeeperException.Code.get(rc))));
+                        }
+                    });
+                } catch (Exception e) {
+                    log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                }
+            }
+        }).exceptionally(ex -> {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, ex);
+            resumeAsyncResponseExceptionally(asyncResponse, ex);
+            return null;
+        });
+    }
+
+    /**
+     * Check the exists topics contains the given topic.
+     * Since there are topic partitions and non-partitioned topics in Pulsar, must ensure both partitions
+     * and non-partitioned topics are not duplicated. So, if compare with a partition name, we should compare
+     * to the partitioned name of this partition.
+     *
+     * @param topicName given topic name
+     */
+    protected CompletableFuture<Boolean> checkTopicExistsAsync(TopicName topicName) {
+        return pulsar().getNamespaceService().getListOfTopics(topicName.getNamespaceObject(),
+                PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                .thenCompose(topics -> {
+                    boolean exists = false;
+                    for (String topic : topics) {
+                        if (topicName.getPartitionedTopicName().equals(TopicName.get(topic).getPartitionedTopicName())) {
+                            exists = true;
+                            break;
+                        }
+                    }
+                    return CompletableFuture.completedFuture(exists);
+                });
+    }
+
+    protected void resumeAsyncResponseExceptionally(AsyncResponse asyncResponse, Throwable throwable) {
+        if (throwable instanceof WebApplicationException) {
+            asyncResponse.resume(throwable);
+        } else {
+            asyncResponse.resume(new RestException(throwable));
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index e37f09d..f2a95d1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,7 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import org.apache.pulsar.common.api.proto.PulsarApi;
+
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.github.zafarkhaja.semver.Version;
@@ -390,46 +390,6 @@ public class PersistentTopicsBase extends AdminResource {
         revokePermissions(topicName.toString(), role);
     }
 
-    protected void internalCreatePartitionedTopic(int numPartitions) {
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
-        }
-        validatePartitionTopicName(topicName.getLocalName());
-        try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-        try {
-            String path = ZkAdminPaths.partitionedTopicPath(topicName);
-            byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            tryCreatePartitionsAsync(numPartitions);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
-        } catch (KeeperException.BadVersionException e) {
-                log.warn("[{}] Failed to create partitioned topic {}: concurrent modification", clientAppId(),
-                        topicName);
-                throw new RestException(Status.CONFLICT, "Concurrent modification");
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-    }
-
     protected void internalCreateNonPartitionedTopic(boolean authoritative) {
         validateAdminAccessForTenant(topicName.getTenant());
         validateNonPartitionTopicName(topicName.getLocalName());
@@ -540,11 +500,22 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected void internalCreateMissedPartitions() {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, false, false);
-        if (metadata != null) {
-            tryCreatePartitionsAsync(metadata.partitions);
-        }
+    protected void internalCreateMissedPartitions(AsyncResponse asyncResponse) {
+        getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> {
+            if (metadata != null) {
+                tryCreatePartitionsAsync(metadata.partitions).thenAccept(v -> {
+                    asyncResponse.resume(Response.noContent().build());
+                }).exceptionally(e -> {
+                    log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+                    resumeAsyncResponseExceptionally(asyncResponse, e);
+                    return null;
+                });
+            }
+        }).exceptionally(e -> {
+            log.error("[{}] Failed to create partitions for topic {}", clientAppId(), topicName);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+            return null;
+        });
     }
 
     private CompletableFuture<Void> updatePartitionInOtherCluster(int numPartitions, Set<String> clusters) {
@@ -2072,40 +2043,6 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     /**
-     * Validate partitioned topic name.
-     * Validation will fail and throw RestException if
-     * 1) There's already a partitioned topic with same topic name and have some of its partition created.
-     * 2) There's already non partition topic with same name and contains partition suffix "-partition-"
-     * followed by numeric value. In this case internal created partition of partitioned topic could override
-     * the existing non partition topic.
-     *
-     * @param topicName
-     */
-    private void validatePartitionTopicName(String topicName) {
-        List<String> existingTopicList = internalGetList();
-        String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
-        for (String existingTopicName : existingTopicList) {
-            if (existingTopicName.contains(prefix)) {
-                try {
-                    Long.parseLong(existingTopicName.substring(
-                            existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
-                                    + TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
-                    log.warn("[{}] Already have topic {} which contains partition " +
-                            "suffix '-partition-' and end with numeric value. Creation of partitioned topic {}"
-                            + "could cause conflict.", clientAppId(), existingTopicName, topicName);
-                    throw new RestException(Status.PRECONDITION_FAILED,
-                            "Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " +
-                                    "and end with numeric value, Creation of partitioned topic " + topicName +
-                                    " could cause conflict.");
-                } catch (NumberFormatException e) {
-                    // Do nothing, if value after partition suffix is not pure numeric value,
-                    // as it can't conflict with internal created partitioned topic's name.
-                }
-            }
-        }
-    }
-
-    /**
      * Validate non partition topic name,
      * Validation will fail and throw RestException if
      * 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index 4bc0ddf..2338b0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -124,41 +124,15 @@ public class NonPersistentTopics extends PersistentTopics {
     @ApiOperation(hidden = true, value = "Create a partitioned topic.", notes = "It needs to be called before creating a producer on a partitioned topic.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
-    public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
-        }
         try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
-        try {
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
-                    topicName.getEncodedLocalName());
-            byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exist");
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index 836ca14..362adc8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -57,7 +57,8 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
-import javax.ws.rs.container.AsyncResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  */
@@ -66,7 +67,7 @@ import javax.ws.rs.container.AsyncResponse;
 @Api(value = "/persistent", description = "Persistent topic admin apis", tags = "persistent topic", hidden = true)
 @SuppressWarnings("deprecation")
 public class PersistentTopics extends PersistentTopicsBase {
-
+    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
     @GET
     @Path("/{property}/{cluster}/{namespace}")
     @ApiOperation(hidden = true, value = "Get the list of topics under a namespace.", response = String.class, responseContainer = "List")
@@ -147,11 +148,16 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 409, message = "Partitioned topic already exist") })
-    public void createPartitionedTopic(@PathParam("property") String property, @PathParam("cluster") String cluster,
+    public void createPartitionedTopic(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster,
             @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
             int numPartitions) {
-        validateTopicName(property, cluster, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions);
+        try {
+            validateTopicName(property, cluster, namespace, encodedTopic);
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     /**
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 7e88eed..3756f82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -164,6 +164,7 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration"),
     })
     public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -172,39 +173,15 @@ public class NonPersistentTopics extends PersistentTopics {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
             int numPartitions) {
-        validateGlobalNamespaceOwnership(tenant,namespace);
-        validateTopicName(tenant, namespace, encodedTopic);
-        validateAdminAccessForTenant(topicName.getTenant());
-        if (numPartitions <= 0) {
-            throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
-        }
-        try {
-            boolean topicExist = pulsar().getNamespaceService()
-                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                    .join()
-                    .contains(topicName.toString());
-            if (topicExist) {
-                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
-                throw new RestException(Status.CONFLICT, "This topic already exists");
-            }
-        } catch (Exception e) {
-            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
-        }
+
         try {
-            String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
-                    topicName.getEncodedLocalName());
-            byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
-            zkCreateOptimistic(path, data);
-            // Sync data to all quorums and the observers
-            zkSync(path);
-            log.info("[{}] Successfully created partitioned topic {}", clientAppId(), topicName);
-        } catch (KeeperException.NodeExistsException e) {
-            log.warn("[{}] Failed to create already existing partitioned topic {}", clientAppId(), topicName);
-            throw new RestException(Status.CONFLICT, "Partitioned topic already exists");
+            validateGlobalNamespaceOwnership(tenant,namespace);
+            validateTopicName(tenant, namespace, encodedTopic);
+            validateAdminAccessForTenant(topicName.getTenant());
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
         } catch (Exception e) {
             log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
-            throw new RestException(e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
         }
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 8c59fa5..b2fc28b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -55,6 +55,9 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiResponse;
 import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.ApiParam;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import static org.apache.pulsar.common.util.Codec.decode;
 
 /**
@@ -192,6 +195,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 503, message = "Failed to validate global cluster configuration")
     })
     public void createPartitionedTopic(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -200,9 +204,15 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "The number of partitions for the topic", required = true, type = "int", defaultValue = "0")
             int numPartitions) {
-        validateGlobalNamespaceOwnership(tenant,namespace);
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        internalCreatePartitionedTopic(numPartitions);
+        try {
+            validateGlobalNamespaceOwnership(tenant,namespace);
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            validateAdminAccessForTenant(topicName.getTenant());
+            internalCreatePartitionedTopic(asyncResponse, numPartitions);
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @PUT
@@ -276,7 +286,7 @@ public class PersistentTopics extends PersistentTopicsBase {
 
     @POST
     @Path("/{tenant}/{namespace}/{topic}/createMissedPartitions")
-    @ApiOperation(value = "Create missed partitions of an existing partitioned topic.", notes = "This is a best-effort operation for create missed partitions of existing non-global partitioned-topic and does't throw any exceptions when create failed")
+    @ApiOperation(value = "Create missed partitions of an existing partitioned topic.")
     @ApiResponses(value = {
             @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
             @ApiResponse(code = 401, message = "Don't have permission to adminisActions to be grantedtrate resources on this tenant"),
@@ -287,6 +297,7 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiResponse(code = 500, message = "Internal server error")
     })
     public void createMissedPartitions(
+            @Suspended final AsyncResponse asyncResponse,
             @ApiParam(value = "Specify the tenant", required = true)
             @PathParam("tenant") String tenant,
             @ApiParam(value = "Specify the namespace", required = true)
@@ -294,8 +305,12 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic) {
 
-        validatePartitionedTopicName(tenant, namespace, encodedTopic);
-        internalCreateMissedPartitions();
+        try {
+            validatePartitionedTopicName(tenant, namespace, encodedTopic);
+            internalCreateMissedPartitions(asyncResponse);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
     }
 
     @GET
@@ -1072,4 +1087,6 @@ public class PersistentTopics extends PersistentTopicsBase {
         validateTopicName(tenant, namespace, encodedTopic);
         return internalGetLastMessageId(authoritative);
     }
+
+    private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
index f0ee4f6..d832628 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java
@@ -919,9 +919,7 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         try {
             admin.topics().createPartitionedTopic(partitionedTopicName, 32);
             fail("Should have failed as the partitioned topic already exists");
-        } catch (PreconditionFailedException e) {
-            // Expecting PreconditionFailedException instead of ConflictException as it'll
-            // fail validation before actually try to create metadata in ZK.
+        } catch (ConflictException ignore) {
         }
 
         producer = client.newProducer(Schema.BYTES)
@@ -2010,6 +2008,10 @@ public class AdminApiTest extends MockedPulsarServiceBaseTest {
         } catch (PulsarAdminException e) {
             assertTrue(e instanceof ConflictException);
         }
+
+        // Check create partitioned topic with substring topic name
+        admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/create_substring_topic", 1);
+        admin.topics().createPartitionedTopic("persistent://prop-xyz/ns1/substring_topic", 1);
     }
 
     /**
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
index 00d1a31..3cae752 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminTest.java
@@ -677,7 +677,11 @@ public class AdminTest extends MockedPulsarServiceBaseTest {
         verify(response, times(1)).resume(Lists.newArrayList());
         // create topic
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists.newArrayList());
-        persistentTopics.createPartitionedTopic(property, cluster, namespace, topic, 5);
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, property, cluster, namespace, topic, 5);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         assertEquals(persistentTopics.getPartitionedTopicList(property, cluster, namespace), Lists
                 .newArrayList(String.format("persistent://%s/%s/%s/%s", property, cluster, namespace, topic)));
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 1825d31..cac35a6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -56,6 +56,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyBoolean;
@@ -157,13 +158,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
                 "Partitioned Topic not found: persistent://my-tenant/my-namespace/topic-not-found-partition-0 has zero partitions");
 
         // 3) Create the partitioned topic
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, testLocalTopicName, 3);
+        response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, testLocalTopicName, 3);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         // 4) Create a subscription
         response = mock(AsyncResponse.class);
         persistentTopics.createSubscription(response, testTenant, testNamespace, testLocalTopicName, "test", true,
                 (MessageIdImpl) MessageId.earliest, false);
-        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
         Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
@@ -239,7 +244,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
     }
 
-    @Test(expectedExceptions = RestException.class)
+    @Test
     public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
         // Test the case in which user already has topic like topic-name-partition-123 created before we enforce the validation.
         final String nonPartitionTopicName1 = "standard-topic";
@@ -250,7 +255,12 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
+        doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<RestException> errCaptor = ArgumentCaptor.forClass(RestException.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        verify(response, timeout(5000).times(1)).resume(errCaptor.capture());
+        Assert.assertEquals(errCaptor.getValue().getResponse().getStatus(), Response.Status.CONFLICT.getStatusCode());
     }
 
     @Test(expectedExceptions = RestException.class)
@@ -263,13 +273,18 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
         doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
         doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
+        doReturn(CompletableFuture.completedFuture(ImmutableSet.of(nonPartitionTopicName2))).when(mockZooKeeperChildrenCache).getAsync(anyString());
         doAnswer(invocation -> {
             persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
             persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
             return null;
         }).when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
         doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, 5);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, 10);
     }
 
@@ -295,7 +310,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         // 3) create partitioned topic and unload
         response = mock(AsyncResponse.class);
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionTopicName, 6);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionTopicName, 6);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
+        response = mock(AsyncResponse.class);
         persistentTopics.unloadTopic(response, testTenant, testNamespace, partitionTopicName, true);
         responseCaptor = ArgumentCaptor.forClass(Response.class);
         verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
@@ -320,10 +339,17 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
     @Test
     public void testGetPartitionedTopicsList() throws KeeperException, InterruptedException, PulsarAdminException {
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, "test-topic1", 3);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, "test-topic1", 3);
-
-        nonPersistentTopic.createPartitionedTopic(testTenant, testNamespace, "test-topic2", 3);
+        response = mock(AsyncResponse.class);
+        responseCaptor = ArgumentCaptor.forClass(Response.class);
+        nonPersistentTopic.createPartitionedTopic(response, testTenant, testNamespace, "test-topic2", 3);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         List<String> persistentPartitionedTopics = persistentTopics.getPartitionedTopicList(testTenant, testNamespace);
 
@@ -351,7 +377,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testGrantPartitionedTopic() {
         final String partitionedTopicName = "partitioned-topic";
         final int numPartitions = 5;
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions);
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
 
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
@@ -387,8 +417,11 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
     public void testRevokePartitionedTopic() {
         final String partitionedTopicName = "partitioned-topic";
         final int numPartitions = 5;
-        persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, numPartitions);
-
+        AsyncResponse response = mock(AsyncResponse.class);
+        ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
+        persistentTopics.createPartitionedTopic(response, testTenant, testNamespace, partitionedTopicName, numPartitions);
+        verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
+        Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
         String role = "role";
         Set<AuthAction> expectActions = new HashSet<>();
         expectActions.add(AuthAction.produce);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
index 373a6b7..78e3dc1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java
@@ -879,9 +879,7 @@ public class V1_AdminApiTest extends MockedPulsarServiceBaseTest {
         try {
             admin.topics().createPartitionedTopic(partitionedTopicName, 32);
             fail("Should have failed as the partitioned topic exists with its partition created");
-        } catch (PreconditionFailedException e) {
-            // Expecting PreconditionFailedException instead of ConflictException as it'll
-            // fail validation before actually try to create metadata in ZK.
+        } catch (ConflictException ignore) {
         }
 
         producer = client.newProducer(Schema.BYTES)