You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/10/08 15:51:42 UTC

[pulsar] branch master updated: Ensure the handling of PartitionMetadataRequest is async end-to-end (#5307)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 71cfe3a  Ensure the handling of PartitionMetadataRequest is async end-to-end (#5307)
71cfe3a is described below

commit 71cfe3a6e0b8c21da714902a07722afa96036a17
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Oct 8 08:51:37 2019 -0700

    Ensure the handling of PartitionMetadataRequest is async end-to-end (#5307)
    
    * Ensure the handling of PartitionMetadataRequest is async end-to-end
    
    * Fixed z-node path
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  18 +++
 .../broker/cache/ConfigurationCacheService.java    |  12 +-
 .../apache/pulsar/broker/admin/AdminResource.java  | 147 ++-------------------
 .../broker/admin/impl/PersistentTopicsBase.java    |   7 +-
 .../broker/cache/LocalZooKeeperCacheService.java   |   4 +
 .../pulsar/broker/namespace/NamespaceService.java  |  67 ++++++----
 .../pulsar/broker/service/BrokerService.java       |  74 +++++++++++
 .../apache/pulsar/broker/service/ServerCnx.java    |  61 ++++-----
 8 files changed, 189 insertions(+), 201 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 5ed90ce..4f6acb0 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1382,4 +1382,22 @@ public class ServiceConfiguration implements PulsarConfiguration {
     public Optional<Integer> getWebServicePortTls() {
         return webServicePortTls;
     }
+
+    public boolean isDefaultTopicTypePartitioned() {
+        return TopicType.PARTITIONED.toString().equals(allowAutoTopicCreationType);
+    }
+
+    enum TopicType {
+        PARTITIONED("partitioned"),
+        NON_PARTITIONED("non-partitioned");
+        private String type;
+
+        TopicType(String type) {
+            this.type = type;
+        }
+
+        public String toString() {
+            return type;
+        }
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
index 4ac6cab..06781ff 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/cache/ConfigurationCacheService.java
@@ -67,6 +67,8 @@ public class ConfigurationCacheService {
     public static final String POLICIES_ROOT = "/admin/policies";
     private static final String CLUSTERS_ROOT = "/admin/clusters";
 
+    public static final String PARTITIONED_TOPICS_ROOT = "/admin/partitioned-topics";
+
     public ConfigurationCacheService(ZooKeeperCache cache) throws PulsarServerException {
         this(cache, null);
     }
@@ -98,7 +100,7 @@ public class ConfigurationCacheService {
         };
 
         this.clustersListCache = new ZooKeeperChildrenCache(cache, CLUSTERS_ROOT);
-        
+
         CLUSTER_FAILURE_DOMAIN_ROOT = CLUSTERS_ROOT + "/" + configuredClusterName + "/" + FAILURE_DOMAIN;
         if (isNotBlank(configuredClusterName)) {
             createFailureDomainRoot(cache.getZooKeeper(), CLUSTER_FAILURE_DOMAIN_ROOT);
@@ -114,7 +116,7 @@ public class ConfigurationCacheService {
                         }));
             }
         };
-        
+
         this.failureDomainCache = new ZooKeeperDataCache<FailureDomain>(cache) {
             @Override
             public FailureDomain deserialize(String path, byte[] content) throws Exception {
@@ -169,7 +171,7 @@ public class ConfigurationCacheService {
     public ZooKeeperCache cache() {
         return cache;
     }
-    
+
     public ZooKeeperDataCache<TenantInfo> propertiesCache() {
         return this.propertiesCache;
     }
@@ -189,7 +191,7 @@ public class ConfigurationCacheService {
     public ZooKeeperChildrenCache failureDomainListCache() {
         return this.failureDomainListCache;
     }
-    
+
     public ZooKeeper getZooKeeper() {
         return this.cache.getZooKeeper();
     }
@@ -197,7 +199,7 @@ public class ConfigurationCacheService {
     public ZooKeeperDataCache<NamespaceIsolationPolicies> namespaceIsolationPoliciesCache() {
         return this.namespaceIsolationPoliciesCache;
     }
-    
+
     public ZooKeeperDataCache<FailureDomain> failureDomainCache() {
         return this.failureDomainCache;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 39ff495..02ebd06 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,12 +18,13 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
-import org.apache.pulsar.common.api.proto.PulsarApi;
 import static org.apache.pulsar.common.util.Codec.decode;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Lists;
+
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.util.List;
@@ -43,13 +44,13 @@ import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BundlesData;
@@ -77,13 +78,9 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Lists;
-
 public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
     private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
-    private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
     public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
 
     protected ZooKeeper globalZk() {
@@ -521,12 +518,11 @@ public abstract class AdminResource extends PulsarWebResource {
             throw new RestException(e);
         }
 
-        String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName());
         PartitionedTopicMetadata partitionMetadata;
         if (checkAllowAutoCreation) {
-            partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), path, topicName);
+            partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), topicName);
         } else {
-            partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
+            partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), topicName);
         }
 
         if (log.isDebugEnabled()) {
@@ -536,9 +532,9 @@ public abstract class AdminResource extends PulsarWebResource {
         return partitionMetadata;
     }
 
-    protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, String path) {
+    protected static PartitionedTopicMetadata fetchPartitionedTopicMetadata(PulsarService pulsar, TopicName topicName) {
         try {
-            return fetchPartitionedTopicMetadataAsync(pulsar, path).get();
+            return pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
         } catch (Exception e) {
             if (e.getCause() instanceof RestException) {
                 throw (RestException) e;
@@ -547,37 +543,10 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
-    protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(
-            PulsarService pulsar, String path) {
-        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
-        try {
-            // gets the number of partitions from the zk cache
-            pulsar.getGlobalZkCache().getDataAsync(path, new Deserializer<PartitionedTopicMetadata>() {
-                @Override
-                public PartitionedTopicMetadata deserialize(String key, byte[] content) throws Exception {
-                    return jsonMapper().readValue(content, PartitionedTopicMetadata.class);
-                }
-            }).thenAccept(metadata -> {
-                // if the partitioned topic is not found in zk, then the topic is not partitioned
-                if (metadata.isPresent()) {
-                    metadataFuture.complete(metadata.get());
-                } else {
-                    metadataFuture.complete(new PartitionedTopicMetadata());
-                }
-            }).exceptionally(ex -> {
-                metadataFuture.completeExceptionally(ex);
-                return null;
-            });
-        } catch (Exception e) {
-            metadataFuture.completeExceptionally(e);
-        }
-        return metadataFuture;
-    }
-
     protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
-            PulsarService pulsar, String path, TopicName topicName) {
+            PulsarService pulsar, TopicName topicName) {
         try {
-            return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName)
+            return pulsar.getBrokerService().fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName)
                     .get();
         } catch (Exception e) {
             if (e.getCause() instanceof RestException) {
@@ -587,85 +556,7 @@ public abstract class AdminResource extends PulsarWebResource {
         }
     }
 
-    protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
-            PulsarService pulsar, String path, TopicName topicName) {
-        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
-        try {
-            boolean allowAutoTopicCreation = pulsar.getConfiguration().isAllowAutoTopicCreation();
-            String topicType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
-            boolean topicExist;
-            try {
-                topicExist = pulsar.getNamespaceService()
-                        .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
-                        .join()
-                        .contains(topicName.toString());
-            } catch (Exception e) {
-                log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
-                        topicName, e.getMessage(), e);
-                throw new RestException(e);
-            }
-            fetchPartitionedTopicMetadataAsync(pulsar, path).whenCompleteAsync((metadata, ex) -> {
-                if (ex != null) {
-                    metadataFuture.completeExceptionally(ex);
-                    // If topic is already exist, creating partitioned topic is not allowed.
-                } else if (metadata.partitions == 0 && !topicExist && allowAutoTopicCreation &&
-                        TopicType.PARTITIONED.toString().equals(topicType)) {
-                    createDefaultPartitionedTopicAsync(pulsar, path).whenComplete((defaultMetadata, e) -> {
-                        if (e == null) {
-                            metadataFuture.complete(defaultMetadata);
-                        } else if (e instanceof KeeperException) {
-                            try {
-                                Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-                                if (!pulsar.getGlobalZkCache().exists(path)){
-                                    metadataFuture.completeExceptionally(e);
-                                    return;
-                                }
-                            } catch (InterruptedException | KeeperException exc) {
-                                metadataFuture.completeExceptionally(exc);
-                                return;
-                            }
-                            fetchPartitionedTopicMetadataAsync(pulsar, path).whenComplete((metadata2, ex2) -> {
-                                if (ex2 != null) {
-                                    metadataFuture.completeExceptionally(ex2);
-                                } else {
-                                    metadataFuture.complete(metadata2);
-                                }
-                            });
-                        } else {
-                            metadataFuture.completeExceptionally(e);
-                        }
-                    });
-                } else {
-                    metadataFuture.complete(metadata);
-                }
-            });
-        } catch (Exception e) {
-            metadataFuture.completeExceptionally(e);
-        }
-        return metadataFuture;
-    }
-
-    protected static CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(
-            PulsarService pulsar, String path) {
-        int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
-        checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
-        PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
-        CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
-        try {
-            byte[] content = jsonMapper().writeValueAsBytes(configMetadata);
-            ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, content,
-                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
-            // we wait for the data to be synced in all quorums and the observers
-            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
-            partitionedTopicFuture.complete(configMetadata);
-        } catch (JsonProcessingException | KeeperException | InterruptedException e) {
-            log.error("Failed to create default partitioned topic.", e);
-            partitionedTopicFuture.completeExceptionally(e);
-        }
-        return partitionedTopicFuture;
-    }
-
-    protected void validateClusterExists(String cluster) {
+   protected void validateClusterExists(String cluster) {
         try {
             if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
                 throw new RestException(Status.PRECONDITION_FAILED, "Cluster " + cluster + " does not exist.");
@@ -730,18 +621,4 @@ public abstract class AdminResource extends PulsarWebResource {
         partitionedTopics.sort(null);
         return partitionedTopics;
     }
-
-    enum TopicType {
-        PARTITIONED("partitioned"),
-        NON_PARTITIONED("non-partitioned");
-        private String type;
-
-        TopicType(String type) {
-            this.type = type;
-        }
-
-        public String toString() {
-            return type;
-        }
-    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index ba5ce25..0545274 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -115,7 +115,7 @@ import org.slf4j.LoggerFactory;
 public class PersistentTopicsBase extends AdminResource {
     private static final Logger log = LoggerFactory.getLogger(PersistentTopicsBase.class);
 
-    protected static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
+    public static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
     private static final int OFFLINE_TOPIC_STAT_TTL_MINS = 10;
     private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
     private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);
@@ -1670,7 +1670,8 @@ public class PersistentTopicsBase extends AdminResource {
             // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
             // producer/consumer
             checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
-                    .thenCompose(res -> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName))
+                    .thenCompose(res -> pulsar.getBrokerService()
+                            .fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(topicName))
                     .thenAccept(metadata -> {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
@@ -1786,7 +1787,7 @@ public class PersistentTopicsBase extends AdminResource {
     private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions) {
         String path = path(PARTITIONED_TOPIC_PATH_ZNODE, topicName.getPersistenceNamingEncoding());
         CompletableFuture<Void> result = new CompletableFuture<>();
-        fetchPartitionedTopicMetadataAsync(pulsar(), path).thenAccept(partitionMetadata -> {
+        pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> {
             if (partitionMetadata.partitions <= 1) {
                 result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic"));
                 return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
index 4b28cad..625f805 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/cache/LocalZooKeeperCacheService.java
@@ -247,4 +247,8 @@ public class LocalZooKeeperCacheService {
     public ZooKeeperChildrenCache managedLedgerListCache() {
         return this.managedLedgerListCache;
     }
+
+    public CompletableFuture<Boolean> managedLedgerExists(String persistentPath) {
+        return cache.existsAsync(MANAGED_LEDGER_ROOT + "/" + persistentPath, cache);
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 83e3ade..7070e54 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
 import org.apache.pulsar.broker.loadbalance.ResourceUnit;
 import org.apache.pulsar.broker.lookup.LookupResult;
+import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
@@ -873,6 +874,17 @@ public class NamespaceService {
                         });
     }
 
+    public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
+        if (topic.isPersistent()) {
+            return pulsar.getLocalZkCacheService()
+                    .managedLedgerExists(topic.getPersistenceNamingEncoding());
+        } else {
+            return pulsar.getBrokerService()
+                    .getTopicIfExists(topic.toString())
+                    .thenApply(optTopic -> optTopic.isPresent());
+        }
+    }
+
     public CompletableFuture<List<String>> getListOfTopics(NamespaceName namespaceName, Mode mode) {
         switch (mode) {
             case ALL:
@@ -905,37 +917,36 @@ public class NamespaceService {
     }
 
     public CompletableFuture<List<String>> getListOfNonPersistentTopics(NamespaceName namespaceName) {
-        ClusterData peerClusterData;
-        try {
-            peerClusterData = PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
-                    .get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), SECONDS);
-        } catch (InterruptedException | ExecutionException | TimeoutException e) {
-            throw new RuntimeException("Failed to contact peer replication cluster.", e);
-        }
-
-        // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request should be
-        // redirect to the peer-cluster
-        if (peerClusterData != null) {
-            return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
-        }
 
-        // Non-persistent topics don't have managed ledgers so we have to retrieve them from local cache.
-        List<String> topics = Lists.newArrayList();
-        synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
-            if (pulsar.getBrokerService().getMultiLayerTopicMap().containsKey(namespaceName.toString())) {
-                pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
-                    .forEach(bundle -> {
-                        bundle.forEach((topicName, topic) -> {
-                            if (topic instanceof NonPersistentTopic && ((NonPersistentTopic)topic).isActive()) {
-                                topics.add(topicName);
+        return PulsarWebResource.checkLocalOrGetPeerReplicationCluster(pulsar, namespaceName)
+                .thenCompose(peerClusterData -> {
+                    // if peer-cluster-data is present it means namespace is owned by that peer-cluster and request
+                    // should be redirect to the peer-cluster
+                    if (peerClusterData != null) {
+                        return getNonPersistentTopicsFromPeerCluster(peerClusterData, namespaceName);
+                    } else {
+                        // Non-persistent topics don't have managed ledgers so we have to retrieve them from local
+                        // cache.
+                        List<String> topics = Lists.newArrayList();
+                        synchronized (pulsar.getBrokerService().getMultiLayerTopicMap()) {
+                            if (pulsar.getBrokerService().getMultiLayerTopicMap()
+                                    .containsKey(namespaceName.toString())) {
+                                pulsar.getBrokerService().getMultiLayerTopicMap().get(namespaceName.toString()).values()
+                                        .forEach(bundle -> {
+                                            bundle.forEach((topicName, topic) -> {
+                                                if (topic instanceof NonPersistentTopic
+                                                        && ((NonPersistentTopic) topic).isActive()) {
+                                                    topics.add(topicName);
+                                                }
+                                            });
+                                        });
                             }
-                        });
-                    });
-            }
-        }
+                        }
 
-        topics.sort(null);
-        return CompletableFuture.completedFuture(topics);
+                        topics.sort(null);
+                        return CompletableFuture.completedFuture(topics);
+                    }
+                });
     }
 
     private CompletableFuture<List<String>> getNonPersistentTopicsFromPeerCluster(ClusterData peerClusterData,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fa3c14e..490a691 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -69,6 +69,7 @@ import java.util.function.Predicate;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
+
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
@@ -77,6 +78,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.util.SafeRunnable;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -84,8 +86,10 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
 import org.apache.pulsar.broker.authentication.AuthenticationService;
 import org.apache.pulsar.broker.authorization.AuthorizationService;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory;
 import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader;
 import org.apache.pulsar.broker.loadbalance.LoadManager;
@@ -119,6 +123,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -139,6 +144,7 @@ import org.apache.pulsar.zookeeper.ZooKeeperCacheListener;
 import org.apache.pulsar.zookeeper.ZooKeeperDataCache;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -1611,6 +1617,74 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
     }
 
+    public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(TopicName topicName) {
+        return pulsar.getNamespaceService().checkTopicExists(topicName)
+                .thenCompose(topicExists -> {
+                    return fetchPartitionedTopicMetadataAsync(topicName)
+                            .thenCompose(metadata -> {
+                                // If topic is already exist, creating partitioned topic is not allowed.
+                                if (metadata.partitions == 0
+                                        && !topicExists
+                                        && pulsar.getConfiguration().isAllowAutoTopicCreation()
+                                        && pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
+                                    return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+                                } else {
+                                    return CompletableFuture.completedFuture(metadata);
+                                }
+                            });
+                });
+    }
+
+    @SuppressWarnings("deprecation")
+    private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
+        int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+        checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
+
+        PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
+        CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
+
+        try {
+            byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMetadata);
+
+            ZkUtils.asyncCreateFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(),
+                    partitionedTopicPath(topicName), content,
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> {
+                        if (rc == KeeperException.Code.OK.intValue()) {
+                            // we wait for the data to be synced in all quorums and the observers
+                            executor().schedule(
+                                    SafeRunnable.safeRun(() -> partitionedTopicFuture.complete(configMetadata)),
+                                    PersistentTopicsBase.PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS, TimeUnit.MILLISECONDS);
+                        } else {
+                            partitionedTopicFuture.completeExceptionally(KeeperException.create(rc));
+                        }
+                    }, null);
+
+        } catch (Exception e) {
+            log.error("Failed to create default partitioned topic.", e);
+            return FutureUtil.failedFuture(e);
+        }
+
+        return partitionedTopicFuture;
+    }
+
+    public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) {
+        // gets the number of partitions from the zk cache
+        return pulsar.getGlobalZkCache().getDataAsync(partitionedTopicPath(topicName), (key, content) -> {
+            return ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class);
+        }).thenApply(metadata -> {
+            // if the partitioned topic is not found in zk, then the topic is not partitioned
+            return metadata.orElseGet(() -> new PartitionedTopicMetadata());
+        });
+    }
+
+    private static String partitionedTopicPath(TopicName topicName) {
+        return String.format("%s/%s/%s/%s",
+                ConfigurationCacheService.PARTITIONED_TOPICS_ROOT,
+                topicName.getNamespace(),
+                topicName.getDomain(),
+                topicName.getEncodedLocalName());
+    }
+
     public OrderedExecutor getTopicOrderedExecutor() {
         return topicOrderedExecutor;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 7a3a3d5..005f615 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -340,40 +340,41 @@ public class ServerCnx extends PulsarHandler {
             }
             String finalOriginalPrincipal = originalPrincipal;
             isProxyAuthorizedFuture.thenApply(isProxyAuthorized -> {
-                    if (isProxyAuthorized) {
+                if (isProxyAuthorized) {
                     getPartitionedTopicMetadata(getBrokerService().pulsar(),
-                                                authRole, finalOriginalPrincipal, authenticationData,
+                            authRole, finalOriginalPrincipal, authenticationData,
                             topicName).handle((metadata, ex) -> {
-                                    if (ex == null) {
-                                        int partitions = metadata.partitions;
-                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
+                                if (ex == null) {
+                                    int partitions = metadata.partitions;
+                                    ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitions, requestId));
+                                } else {
+                                    if (ex instanceof PulsarClientException) {
+                                        log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
+                                                remoteAddress, topicName, ex.getMessage());
+                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
+                                                ServerError.AuthorizationError, ex.getMessage(), requestId));
                                     } else {
-                                        if (ex instanceof PulsarClientException) {
-                                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", getRole(),
-                                                    remoteAddress, topicName, ex.getMessage());
-                                            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(
-                                                    ServerError.AuthorizationError, ex.getMessage(), requestId));
-                                        } else {
-                                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
-                                                    topicName, ex.getMessage(), ex);
-                                            ServerError error = (ex instanceof RestException)
-                                                    && ((RestException) ex).getResponse().getStatus() < 500
-                                                            ? ServerError.MetadataError : ServerError.ServiceNotReady;
-                                            ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
-                                                    ex.getMessage(), requestId));
-                                        }
+                                        log.warn("Failed to get Partitioned Metadata [{}] {}: {}", remoteAddress,
+                                                topicName, ex.getMessage(), ex);
+                                        ServerError error = (ex instanceof RestException)
+                                                && ((RestException) ex).getResponse().getStatus() < 500
+                                                        ? ServerError.MetadataError
+                                                        : ServerError.ServiceNotReady;
+                                        ctx.writeAndFlush(Commands.newPartitionMetadataResponse(error,
+                                                ex.getMessage(), requestId));
                                     }
-                                    lookupSemaphore.release();
-                                    return null;
-                                });
-                    } else {
-                        final String msg = "Proxy Client is not authorized to Get Partition Metadata";
-                        log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
-                        ctx.writeAndFlush(
-                                Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
-                        lookupSemaphore.release();
-                    }
-                    return null;
+                                }
+                                lookupSemaphore.release();
+                                return null;
+                            });
+                } else {
+                    final String msg = "Proxy Client is not authorized to Get Partition Metadata";
+                    log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);
+                    ctx.writeAndFlush(
+                            Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, msg, requestId));
+                    lookupSemaphore.release();
+                }
+                return null;
             }).exceptionally(ex -> {
                 final String msg = "Exception occured while trying to authorize get Partition Metadata";
                 log.warn("[{}] {} with role {} on topic {}", remoteAddress, msg, authRole, topicName);