You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/09/17 14:01:17 UTC

[pulsar] branch master updated: Add more config for auto-topic-creation (#4963)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 547c421  Add more config for auto-topic-creation (#4963)
547c421 is described below

commit 547c4218124a6055a468152304d539d587071e2d
Author: Xiaobing Fang <bi...@qq.com>
AuthorDate: Tue Sep 17 09:01:12 2019 -0500

    Add more config for auto-topic-creation (#4963)
    
    Master Issue:  #4926
    
    ### Motivation
    
    
    Curently the partitioned-topic and non-partitioned topic is a little confuse for users. in PR #3450 we add config for auto-topic-creation.
    We could leverage this config to provide some more config for auto-topic-creation.
    
    ### Modifications
    
    - Add `allowAutoTopicCreationType` and `allowAutoTopicCreationNumPartitions` to configuration.
    - Users can use both configurations when they decide to create a topic automatically.
    - Add test.
    - Update doc.
---
 conf/broker.conf                                   |   6 ++
 conf/standalone.conf                               |   9 ++
 .../apache/pulsar/broker/ServiceConfiguration.java |  13 ++-
 .../apache/pulsar/broker/admin/AdminResource.java  | 116 +++++++++++++++++++-
 .../pulsar/broker/admin/impl/BrokersBase.java      |   7 ++
 .../broker/admin/impl/PersistentTopicsBase.java    |  52 +++++----
 .../broker/admin/v1/NonPersistentTopics.java       |  18 +++-
 .../pulsar/broker/admin/v1/PersistentTopics.java   |   5 +-
 .../broker/admin/v2/NonPersistentTopics.java       |  19 +++-
 .../pulsar/broker/admin/v2/PersistentTopics.java   |   6 +-
 .../pulsar/broker/admin/PersistentTopicsTest.java  |   6 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   1 +
 .../broker/service/BacklogQuotaManagerTest.java    |   1 +
 .../broker/service/BrokerBkEnsemblesTests.java     |   1 +
 .../broker/service/BrokerBookieIsolationTest.java  |   4 +
 .../BrokerServiceAutoTopicCreationTest.java        | 120 +++++++++++++++++++++
 .../pulsar/broker/service/ReplicatorTestBase.java  |   3 +
 .../pulsar/client/api/NonPersistentTopicTest.java  |   3 +
 .../client/api/SimpleProducerConsumerTest.java     |   4 +-
 .../client/api/v1/V1_ProducerConsumerTest.java     |   4 +-
 .../worker/PulsarFunctionE2ESecurityTest.java      |   3 +-
 .../worker/PulsarFunctionLocalRunTest.java         |   3 +-
 .../worker/PulsarFunctionPublishTest.java          |   3 +-
 .../functions/worker/PulsarFunctionStateTest.java  |   3 +-
 .../worker/PulsarWorkerAssignmentTest.java         |   2 +-
 .../apache/pulsar/io/PulsarFunctionAdminTest.java  |   2 +-
 .../apache/pulsar/io/PulsarFunctionE2ETest.java    |   3 +-
 .../configurations/pulsar_broker_test.conf         |   3 +
 pulsar-client-cpp/test-conf/standalone-ssl.conf    |   9 ++
 pulsar-client-cpp/test-conf/standalone.conf        |   9 ++
 pulsar-client-cpp/tests/standalone.conf            |   9 ++
 .../pulsar/client/impl/HttpLookupService.java      |   3 +-
 .../pulsar/functions/worker/WorkerService.java     |   3 +
 site2/docs/reference-configuration.md              |   2 +
 .../pulsar/tests/integration/cli/CLITest.java      |   6 ++
 .../integration/functions/PulsarFunctionsTest.java |  27 ++++-
 36 files changed, 442 insertions(+), 46 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index b491336..dcd39f3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -88,6 +88,12 @@ ttlDurationDefaultInSeconds=0
 # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
 allowAutoTopicCreation=true
 
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
+
 # Enable the deletion of inactive topics
 brokerDeleteInactiveTopicsEnabled=true
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 388c185..b25f21f 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -592,3 +592,12 @@ allowLoopback=true
 # will heart performance. It is better to give a higher number of gc
 # interval if there is enough disk capacity.
 gcWaitTime=300000
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
\ No newline at end of file
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index bff2b46..ffea06d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -895,10 +895,21 @@ public class ServiceConfiguration implements PulsarConfiguration {
     private double managedLedgerDefaultMarkDeleteRateLimit = 1.0;
     @FieldContext(
         category = CATEGORY_STORAGE_ML,
-    	doc = "Allow automated creation of non-partition topics if set to true (default value)."
+    	doc = "Allow automated creation of topics if set to true (default value)."
     )
     private boolean allowAutoTopicCreation = true;
     @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)"
+    )
+    private String allowAutoTopicCreationType = "partitioned";
+    @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            doc = "The number of partitioned topics that is allowed to be automatically created"
+                    + "if allowAutoTopicCreationType is partitioned."
+    )
+    private int defaultNumPartitions = 1;
+    @FieldContext(
         category = CATEGORY_STORAGE_ML,
         doc = "Number of threads to be used for managed ledger tasks dispatching"
     )
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 94409e1..ed497e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import java.net.MalformedURLException;
@@ -81,6 +83,7 @@ import com.google.common.collect.Lists;
 public abstract class AdminResource extends PulsarWebResource {
     private static final Logger log = LoggerFactory.getLogger(AdminResource.class);
     private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly";
+    private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000;
     public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics";
 
     protected ZooKeeper globalZk() {
@@ -500,7 +503,7 @@ public abstract class AdminResource extends PulsarWebResource {
     }
 
     protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName,
-            boolean authoritative) {
+            boolean authoritative, boolean checkAllowAutoCreation) {
         validateClusterOwnership(topicName.getCluster());
         // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can
         // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
@@ -519,7 +522,12 @@ public abstract class AdminResource extends PulsarWebResource {
         }
 
         String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName());
-        PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
+        PartitionedTopicMetadata partitionMetadata;
+        if (checkAllowAutoCreation) {
+            partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), path, topicName);
+        } else {
+            partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path);
+        }
 
         if (log.isDebugEnabled()) {
             log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), topicName,
@@ -566,6 +574,96 @@ public abstract class AdminResource extends PulsarWebResource {
         return metadataFuture;
     }
 
+    protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation(
+            PulsarService pulsar, String path, TopicName topicName) {
+        try {
+            return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName)
+                    .get();
+        } catch (Exception e) {
+            if (e.getCause() instanceof RestException) {
+                throw (RestException) e;
+            }
+            throw new RestException(e);
+        }
+    }
+
+    protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(
+            PulsarService pulsar, String path, TopicName topicName) {
+        CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>();
+        try {
+            boolean allowAutoTopicCreation = pulsar.getConfiguration().isAllowAutoTopicCreation();
+            String topicType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
+            boolean topicExist;
+            try {
+                topicExist = pulsar.getNamespaceService()
+                        .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                        .contains(topicName.toString());
+            } catch (Exception e) {
+                log.warn("Unexpected error while getting list of topics. topic={}. Error: {}",
+                        topicName, e.getMessage(), e);
+                throw new RestException(e);
+            }
+            fetchPartitionedTopicMetadataAsync(pulsar, path).whenCompleteAsync((metadata, ex) -> {
+                if (ex != null) {
+                    metadataFuture.completeExceptionally(ex);
+                    // If topic is already exist, creating partitioned topic is not allowed.
+                } else if (metadata.partitions == 0 && !topicExist && allowAutoTopicCreation &&
+                        TopicType.PARTITIONED.toString().equals(topicType)) {
+                    createDefaultPartitionedTopicAsync(pulsar, path).whenComplete((defaultMetadata, e) -> {
+                        if (e == null) {
+                            metadataFuture.complete(defaultMetadata);
+                        } else if (e instanceof KeeperException) {
+                            try {
+                                Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+                                if (!pulsar.getGlobalZkCache().exists(path)){
+                                    metadataFuture.completeExceptionally(e);
+                                    return;
+                                }
+                            } catch (InterruptedException | KeeperException exc) {
+                                metadataFuture.completeExceptionally(exc);
+                                return;
+                            }
+                            fetchPartitionedTopicMetadataAsync(pulsar, path).whenComplete((metadata2, ex2) -> {
+                                if (ex2 != null) {
+                                    metadataFuture.completeExceptionally(ex2);
+                                } else {
+                                    metadataFuture.complete(metadata2);
+                                }
+                            });
+                        } else {
+                            metadataFuture.completeExceptionally(e);
+                        }
+                    });
+                } else {
+                    metadataFuture.complete(metadata);
+                }
+            });
+        } catch (Exception e) {
+            metadataFuture.completeExceptionally(e);
+        }
+        return metadataFuture;
+    }
+
+    protected static CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(
+            PulsarService pulsar, String path) {
+        int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+        checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
+        PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
+        CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>();
+        try {
+            byte[] content = jsonMapper().writeValueAsBytes(configMetadata);
+            ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, content,
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            // we wait for the data to be synced in all quorums and the observers
+            Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS);
+            partitionedTopicFuture.complete(configMetadata);
+        } catch (JsonProcessingException | KeeperException | InterruptedException e) {
+            log.error("Failed to create default partitioned topic.", e);
+            partitionedTopicFuture.completeExceptionally(e);
+        }
+        return partitionedTopicFuture;
+    }
+
     protected void validateClusterExists(String cluster) {
         try {
             if (!clustersCache().get(path("clusters", cluster)).isPresent()) {
@@ -627,4 +725,18 @@ public abstract class AdminResource extends PulsarWebResource {
         partitionedTopics.sort(null);
         return partitionedTopics;
     }
+
+    enum TopicType {
+        PARTITIONED("partitioned"),
+        NON_PARTITIONED("non-partitioned");
+        private String type;
+
+        TopicType(String type) {
+            this.type = type;
+        }
+
+        public String toString() {
+            return type;
+        }
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
index 63e392a..fc529e9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java
@@ -262,6 +262,13 @@ public class BrokersBase extends AdminResource {
         PulsarClient client = pulsar().getClient();
 
         String messageStr = UUID.randomUUID().toString();
+        // create non-partitioned topic manually
+        try {
+            pulsar().getBrokerService().getTopic(topic, true).get();
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+            return;
+        }
         CompletableFuture<Producer<String>> producerFuture =
             client.newProducer(Schema.STRING).topic(topic).createAsync();
         CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 9150f99..509b41d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import static org.apache.pulsar.common.util.Codec.decode;
 
 import com.github.zafarkhaja.semver.Version;
@@ -375,6 +376,18 @@ public class PersistentTopicsBase extends AdminResource {
             throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
         }
         try {
+            boolean topicExist = pulsar().getNamespaceService()
+                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                    .contains(topicName.toString());
+            if (topicExist) {
+                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+                throw new RestException(Status.CONFLICT, "This topic already exists");
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+        try {
             String path = ZkAdminPaths.partitionedTopicPath(topicName);
             byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
             zkCreateOptimistic(path, data);
@@ -444,8 +457,8 @@ public class PersistentTopicsBase extends AdminResource {
         }
     }
 
-    protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative) {
-        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative);
+    protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) {
+        PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
         if (metadata.partitions > 1) {
             validateClientVersion();
         }
@@ -457,7 +470,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         final CompletableFuture<Void> future = new CompletableFuture<>();
 
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         final int numPartitions = partitionMetadata.partitions;
         if (numPartitions > 0) {
             final AtomicInteger count = new AtomicInteger(numPartitions);
@@ -590,7 +603,7 @@ public class PersistentTopicsBase extends AdminResource {
 
         final List<String> subscriptions = Lists.newArrayList();
 
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             try {
                 // get the subscriptions only from the 1st partition since all the other partitions will have the same
@@ -685,7 +698,7 @@ public class PersistentTopicsBase extends AdminResource {
 
     protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative,
             boolean perPartition) {
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions == 0) {
             throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
         }
@@ -743,7 +756,7 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
     protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) {
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions == 0) {
             throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
         }
@@ -786,7 +799,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -855,7 +868,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -920,7 +933,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed");
         }
@@ -952,7 +965,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -1027,7 +1040,7 @@ public class PersistentTopicsBase extends AdminResource {
             validateGlobalNamespaceOwnership(namespaceName);
         }
 
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         final int numPartitions = partitionMetadata.partitions;
         if (numPartitions > 0) {
             final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1141,7 +1154,7 @@ public class PersistentTopicsBase extends AdminResource {
         log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName,
                 targetMessageId);
 
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         final int numPartitions = partitionMetadata.partitions;
         if (numPartitions > 0) {
             final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -1249,7 +1262,7 @@ public class PersistentTopicsBase extends AdminResource {
         log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName,
                 subName, messageId);
 
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
 
         if (partitionMetadata.partitions > 0) {
             log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName,
@@ -1288,7 +1301,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed");
         }
@@ -1413,7 +1426,7 @@ public class PersistentTopicsBase extends AdminResource {
         if (topicName.isGlobal()) {
             validateGlobalNamespaceOwnership(namespaceName);
         }
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed");
         }
@@ -1433,7 +1446,7 @@ public class PersistentTopicsBase extends AdminResource {
             validateGlobalNamespaceOwnership(namespaceName);
         }
 
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             final List<CompletableFuture<Void>> futures = Lists.newArrayList();
 
@@ -1489,7 +1502,7 @@ public class PersistentTopicsBase extends AdminResource {
             validateGlobalNamespaceOwnership(namespaceName);
         }
 
-        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative);
+        PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false);
         if (partitionMetadata.partitions > 0) {
             String msg = "This method should not be called for partitioned topic";
             log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName);
@@ -1595,7 +1608,8 @@ public class PersistentTopicsBase extends AdminResource {
             // serve/redirect request else fail partitioned-metadata-request so, client fails while creating
             // producer/consumer
             checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject())
-                    .thenCompose(res -> fetchPartitionedTopicMetadataAsync(pulsar, path)).thenAccept(metadata -> {
+                    .thenCompose(res -> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName))
+                    .thenAccept(metadata -> {
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName,
                                     metadata.partitions);
@@ -1632,7 +1646,7 @@ public class PersistentTopicsBase extends AdminResource {
         }
 
         PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
-                TopicName.get(topicName.getPartitionedTopicName()), false);
+                TopicName.get(topicName.getPartitionedTopicName()), false, false);
         if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
             final String topicErrorType = partitionedTopicMetadata == null ?
                     "has no metadata" : "has zero partitions";
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index f1347e3..96bc084 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -47,6 +47,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.Constants;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
@@ -76,9 +77,10 @@ public class NonPersistentTopics extends PersistentTopics {
     public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return getPartitionedTopicMetadata(topicName, authoritative);
+        return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
     }
 
     @GET
@@ -125,6 +127,18 @@ public class NonPersistentTopics extends PersistentTopics {
             throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
         }
         try {
+            boolean topicExist = pulsar().getNamespaceService()
+                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                    .contains(topicName.toString());
+            if (topicExist) {
+                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+                throw new RestException(Status.CONFLICT, "This topic already exists");
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+        try {
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
index f895ce1..ebece5e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java
@@ -181,9 +181,10 @@ public class PersistentTopics extends PersistentTopicsBase {
     public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property,
             @PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
         validateTopicName(property, cluster, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative);
+        return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
     }
 
     @DELETE
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 8125f5b..10dc5ee 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -48,6 +48,7 @@ import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -86,9 +87,11 @@ public class NonPersistentTopics extends PersistentTopics {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Is check configuration required to automatically create topic")
+            @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return getPartitionedTopicMetadata(topicName, authoritative);
+        return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
     }
 
     @GET
@@ -169,6 +172,18 @@ public class NonPersistentTopics extends PersistentTopics {
             throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
         }
         try {
+            boolean topicExist = pulsar().getNamespaceService()
+                    .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
+                    .contains(topicName.toString());
+            if (topicExist) {
+                log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName);
+                throw new RestException(Status.CONFLICT, "This topic already exists");
+            }
+        } catch (Exception e) {
+            log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e);
+            throw new RestException(e);
+        }
+        try {
             String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(),
                     topicName.getEncodedLocalName());
             byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index e09c012..70e0624 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -288,9 +288,11 @@ public class PersistentTopics extends PersistentTopicsBase {
             @ApiParam(value = "Specify topic name", required = true)
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
-            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
+            @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @ApiParam(value = "Is check configuration required to automatically create topic")
+            @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
         validateTopicName(tenant, namespace, encodedTopic);
-        return internalGetPartitionedMetadata(authoritative);
+        return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
     }
 
     @DELETE
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index fc89acc..3d8a29a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -211,8 +211,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
 
         final String nonPartitionTopic2 = "secondary-non-partitioned-topic";
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true);
-        Assert.assertEquals(
-                persistentTopics.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true) .partitions,
+        Assert.assertEquals(persistentTopics
+                        .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, false).partitions,
                 0);
     }
 
@@ -221,7 +221,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {
         final String topicName = "standard-topic";
         persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
         PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
-                testTenant, testNamespace, topicName, true);
+                testTenant, testNamespace, topicName, true, false);
         Assert.assertEquals(pMetadata.partitions, 0);
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 341b96a..868d591 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -104,6 +104,7 @@ public abstract class MockedPulsarServiceBaseTest {
         this.conf.setDefaultNumberOfNamespaceBundles(1);
         this.conf.setZookeeperServers("localhost:2181");
         this.conf.setConfigurationStoreServers("localhost:3181");
+        this.conf.setAllowAutoTopicCreationType("non-persistent");
     }
 
     protected final void internalSetup() throws Exception {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index c4a387c..88a6603 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -88,6 +88,7 @@ public class BacklogQuotaManagerTest {
             config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
             config.setManagedLedgerMaxEntriesPerLedger(5);
             config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
+            config.setAllowAutoTopicCreationType("non-partitioned");
 
             pulsar = new PulsarService(config);
             pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
index 940002d..f880734 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java
@@ -104,6 +104,7 @@ public class BrokerBkEnsemblesTests {
             config.setManagedLedgerMaxEntriesPerLedger(5);
             config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
             config.setAdvertisedAddress("127.0.0.1");
+            config.setAllowAutoTopicCreationType("non-partitioned");
 
             pulsar = new PulsarService(config);
             pulsar.start();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
index 3b9edc9..68f72b9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java
@@ -158,6 +158,8 @@ public class BrokerBookieIsolationTest {
         config.setManagedLedgerDefaultWriteQuorum(2);
         config.setManagedLedgerDefaultAckQuorum(2);
 
+        config.setAllowAutoTopicCreationType("non-partitioned");
+
         int totalEntriesPerLedger = 20;
         int totalLedgers = totalPublish / totalEntriesPerLedger;
         config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger);
@@ -288,6 +290,7 @@ public class BrokerBookieIsolationTest {
         config.setManagedLedgerDefaultEnsembleSize(2);
         config.setManagedLedgerDefaultWriteQuorum(2);
         config.setManagedLedgerDefaultAckQuorum(2);
+        config.setAllowAutoTopicCreationType("non-partitioned");
 
         int totalEntriesPerLedger = 20;
         int totalLedgers = totalPublish / totalEntriesPerLedger;
@@ -410,6 +413,7 @@ public class BrokerBookieIsolationTest {
         config.setManagedLedgerDefaultEnsembleSize(2);
         config.setManagedLedgerDefaultWriteQuorum(2);
         config.setManagedLedgerDefaultAckQuorum(2);
+        config.setAllowAutoTopicCreationType("non-partitioned");
 
         config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
         pulsarService = new PulsarService(config);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
new file mode 100644
index 0000000..cd1aec9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import org.apache.pulsar.client.api.PulsarClientException;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
+
+    @BeforeClass
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterClass
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testAutoNonPartitionedTopicCreation() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+
+        final String topicName = "persistent://prop/ns-abc/non-partitioned-topic";
+        final String subscriptionName = "non-partitioned-topic-sub";
+        pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+    }
+
+    @Test
+    public void testAutoPartitionedTopicCreation() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+        final String topicName = "persistent://prop/ns-abc/partitioned-topic";
+        final String subscriptionName = "partitioned-topic-sub";
+        pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+        assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+        for (int i = 0; i < 3; i++) {
+            assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+        }
+    }
+
+    @Test
+    public void testAutoTopicCreationDisable() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+
+        final String topicName = "persistent://prop/ns-abc/test-topic";
+        final String subscriptionName = "test-topic-sub";
+        try {
+            pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException);
+        }
+        assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+    }
+
+    @Test
+    public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+        final String topicName = "persistent://prop/ns-abc/test-topic-2";
+        final String subscriptionName = "partitioned-topic-sub";
+        admin.topics().createNonPartitionedTopic(topicName);
+        pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+        for (int i = 0; i < 3; i++) {
+            assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+        }
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+    }
+
+    /**
+     * CheckAllowAutoCreation's default value is false.
+     * So using getPartitionedTopicMetadata() directly will not produce partitioned topic
+     * even if the option to automatically create partitioned topic is configured
+     */
+    @Test
+    public void testGetPartitionedMetadataWithoutCheckAllowAutoCreation() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+        final String topicName = "persistent://prop/ns-abc/test-topic-3";
+        int partitions = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
+        assertEquals(partitions, 0);
+        assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
index ca5ba68..5864f48 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java
@@ -132,6 +132,7 @@ public class ReplicatorTestBase {
         config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
         config1.setDefaultNumberOfNamespaceBundles(1);
+        config1.setAllowAutoTopicCreationType("non-partitioned");
         pulsar1 = new PulsarService(config1);
         pulsar1.start();
         ns1 = pulsar1.getBrokerService();
@@ -165,6 +166,7 @@ public class ReplicatorTestBase {
         config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
         config2.setDefaultNumberOfNamespaceBundles(1);
+        config2.setAllowAutoTopicCreationType("non-partitioned");
         pulsar2 = new PulsarService(config2);
         pulsar2.start();
         ns2 = pulsar2.getBrokerService();
@@ -198,6 +200,7 @@ public class ReplicatorTestBase {
         config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH);
         config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH);
         config3.setDefaultNumberOfNamespaceBundles(1);
+        config3.setAllowAutoTopicCreationType("non-partitioned");
         pulsar3 = new PulsarService(config3);
         pulsar3.start();
         ns3 = pulsar3.getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
index f70e6a1..b54265a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java
@@ -891,6 +891,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
                     inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
             config1.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
             config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+            config1.setAllowAutoTopicCreationType("non-partitioned");
             pulsar1 = new PulsarService(config1);
             pulsar1.start();
             ns1 = pulsar1.getBrokerService();
@@ -917,6 +918,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
                     inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
             config2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
             config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
+            config2.setAllowAutoTopicCreationType("non-partitioned");
             pulsar2 = new PulsarService(config2);
             pulsar2.start();
             ns2 = pulsar2.getBrokerService();
@@ -942,6 +944,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase {
             config3.setBrokerServicePurgeInactiveFrequencyInSeconds(
                     inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS));
             config3.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort()));
+            config3.setAllowAutoTopicCreationType("non-partitioned");
             pulsar3 = new PulsarService(config3);
             pulsar3.start();
             ns3 = pulsar3.getBrokerService();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 35d4c77..c2fad94 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -2309,7 +2309,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         // (1) simple consumers
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/my-ns/failAsyncReceive").subscriptionName("my-subscriber-name")
+                .topic("persistent://my-property/my-ns/failAsyncReceive-1").subscriptionName("my-subscriber-name")
                 .subscribe();
         consumer.close();
         // receive messages
@@ -2322,7 +2322,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase {
 
         // (2) Partitioned-consumer
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive");
+        TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive-2");
         admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
         Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber").subscribe();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
index 24fb5a7..7d9a8cd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java
@@ -2040,7 +2040,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
         // (1) simple consumers
         Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic("persistent://my-property/use/my-ns/failAsyncReceive")
+                .topic("persistent://my-property/use/my-ns/failAsyncReceive-1")
                 .subscriptionName("my-subscriber-name")
                 .subscribe();
         consumer.close();
@@ -2054,7 +2054,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase {
 
         // (2) Partitioned-consumer
         int numPartitions = 4;
-        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive");
+        TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive-2");
         admin.topics().createPartitionedTopic(topicName.toString(), numPartitions);
         Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString())
                 .subscriptionName("my-partitioned-subscriber")
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 54f05d8..3782286 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -89,7 +89,7 @@ public class PulsarFunctionE2ESecurityTest {
     final String TENANT2 = "tenant2";
 
     final String NAMESPACE = "test-ns";
-    String pulsarFunctionsNamespace = TENANT + "/use/pulsar-function-admin";
+    String pulsarFunctionsNamespace = TENANT + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
 
@@ -132,6 +132,7 @@ public class PulsarFunctionE2ESecurityTest {
         config.setBrokerServicePort(Optional.of(brokerServicePort));
         config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName());
         config.setAdvertisedAddress("localhost");
+        config.setAllowAutoTopicCreationType("non-partitioned");
 
         Set<String> providers = new HashSet<>();
         providers.add(AuthenticationProviderToken.class.getName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index 97cabc3..8c2525e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -102,7 +102,7 @@ public class PulsarFunctionLocalRunTest {
     BrokerStats brokerStatsClient;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
-    String pulsarFunctionsNamespace = tenant + "/" + CLUSTER + "/pulsar-function-admin";
+    String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
 
@@ -180,6 +180,7 @@ public class PulsarFunctionLocalRunTest {
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
         config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         config.setBrokerClientTlsEnabled(true);
+        config.setAllowAutoTopicCreationType("non-partitioned");
 
         functionsWorkerService = createPulsarFunctionWorker(config);
         urlTls = new URL(brokerServiceUrl);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 9c2b7b7..efa0695 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -91,7 +91,7 @@ public class PulsarFunctionPublishTest {
     BrokerStats brokerStatsClient;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
-    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
 
@@ -169,6 +169,7 @@ public class PulsarFunctionPublishTest {
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
         config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         config.setBrokerClientTlsEnabled(true);
+        config.setAllowAutoTopicCreationType("non-partitioned");
 
 
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
index 809fca7..d0b85c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java
@@ -90,7 +90,7 @@ public class PulsarFunctionStateTest {
     BrokerStats brokerStatsClient;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
-    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
 
@@ -168,6 +168,7 @@ public class PulsarFunctionStateTest {
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
         config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         config.setBrokerClientTlsEnabled(true);
+        config.setAllowAutoTopicCreationType("non-partitioned");
 
 
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index 55e4a70..27a7d69 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -70,7 +70,7 @@ public class PulsarWorkerAssignmentTest {
     BrokerStats brokerStatsClient;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
-    final String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    final String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index ecfce1c..0c5db19 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -72,7 +72,7 @@ public class PulsarFunctionAdminTest {
     WorkerServer functionsWorkerServer;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
-    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
 
     private final int ZOOKEEPER_PORT = PortManager.nextFreePort();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index 847958f..d8f9cec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -118,7 +118,7 @@ public class PulsarFunctionE2ETest {
     BrokerStats brokerStatsClient;
     WorkerService functionsWorkerService;
     final String tenant = "external-repl-prop";
-    String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin";
+    String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin";
     String primaryHost;
     String workerId;
 
@@ -194,6 +194,7 @@ public class PulsarFunctionE2ETest {
                 "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH);
         config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH);
         config.setBrokerClientTlsEnabled(true);
+        config.setAllowAutoTopicCreationType("non-partitioned");
 
 
 
diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
index c55955f..75b6d03 100644
--- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
+++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf
@@ -33,6 +33,9 @@ backlogQuotaCheckIntervalInSeconds=60
 backlogQuotaDefaultLimitGB=50
 brokerDeleteInactiveTopicsEnabled=true
 brokerDeleteInactiveTopicsFrequencySeconds=60
+allowAutoTopicCreation=true
+allowAutoTopicCreationType=non-partitioned
+defaultNumPartitions=1
 messageExpiryCheckIntervalInMinutes=5
 clientLibraryVersionCheckEnabled=false
 clientLibraryVersionCheckAllowUnversioned=true
diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf
index 426ba43..6ab4406 100644
--- a/pulsar-client-cpp/test-conf/standalone-ssl.conf
+++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf
@@ -278,3 +278,12 @@ keepAliveIntervalSeconds=30
 
 # How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
 brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
\ No newline at end of file
diff --git a/pulsar-client-cpp/test-conf/standalone.conf b/pulsar-client-cpp/test-conf/standalone.conf
index 6f799c1..2de6a37 100644
--- a/pulsar-client-cpp/test-conf/standalone.conf
+++ b/pulsar-client-cpp/test-conf/standalone.conf
@@ -263,3 +263,12 @@ keepAliveIntervalSeconds=30
 
 # How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
 brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/standalone.conf b/pulsar-client-cpp/tests/standalone.conf
index 8a01642..857285a 100644
--- a/pulsar-client-cpp/tests/standalone.conf
+++ b/pulsar-client-cpp/tests/standalone.conf
@@ -266,3 +266,12 @@ keepAliveIntervalSeconds=30
 
 # How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
 brokerServicePurgeInactiveFrequencyInSeconds=60
+
+# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false)
+allowAutoTopicCreation=true
+
+# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)
+allowAutoTopicCreationType=non-partitioned
+
+# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned.
+defaultNumPartitions=1
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 4fe030d..e5334df 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -103,7 +103,8 @@ class HttpLookupService implements LookupService {
 
     public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
         String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions";
-        return httpClient.get(String.format(format, topicName.getLookupName()), PartitionedTopicMetadata.class);
+        return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true",
+                PartitionedTopicMetadata.class);
     }
 
     public String getServiceUrl() {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 9ec9688..0d73915 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -148,6 +148,9 @@ public class WorkerService {
             }
             log.info("Created Pulsar client");
 
+            brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic());
+            brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic());
+            brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic());
             //create scheduler manager
             this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin,
                     this.executor);
diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md
index bb549ae..5261bae 100644
--- a/site2/docs/reference-configuration.md
+++ b/site2/docs/reference-configuration.md
@@ -127,6 +127,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di
 |backlogQuotaCheckIntervalInSeconds|  How often to check for topics that have reached the quota |60|
 |backlogQuotaDefaultLimitGB|  Default per-topic backlog quota limit |10|
 |allowAutoTopicCreation| Enable topic auto creation if new producer or consumer connected |true|
+|allowAutoTopicCreationType| The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) |non-partitioned| 
+|defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned |1|
 |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics  |true|
 |brokerDeleteInactiveTopicsFrequencySeconds|  How often to check for inactive topics  |60|
 |messageExpiryCheckIntervalInMinutes| How frequently to proactively check and purge expired messages  |5|
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
index 3af34d7..ca39cd4 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java
@@ -93,6 +93,12 @@ public class CLITest extends PulsarTestSuite {
     public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Exception {
         String topicName = "persistent://public/default/test-topic-termination";
         BrokerContainer container = pulsarCluster.getAnyBroker();
+        container.execCmd(
+                PulsarCluster.ADMIN_SCRIPT,
+                "topics",
+                "create",
+                topicName);
+
         ContainerExecResult result = container.execCmd(
             PulsarCluster.CLIENT_SCRIPT,
             "produce",
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 23beec8..f1e2d1a 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -547,6 +547,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             .build();
 
         @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+        admin.topics().createNonPartitionedTopic(outputTopicName);
+        @Cleanup
         Consumer<String> consumer = client.newConsumer(Schema.STRING)
             .topic(outputTopicName)
             .subscriptionName("source-tester")
@@ -869,6 +872,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
 
+            admin.topics().createNonPartitionedTopic(inputTopicName);
+            admin.topics().createNonPartitionedTopic(outputTopicName);
             retryStrategically((test) -> {
                 try {
                     return admin.topics().getStats(inputTopicName).subscriptions.size() == 1;
@@ -957,7 +962,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         String inputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-input-" + randomName(8);
         String outputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-output-" + randomName(8);
-
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+            admin.topics().createNonPartitionedTopic(inputTopicName);
+            admin.topics().createNonPartitionedTopic(outputTopicName);
+        }
 
         CommandGenerator generator = CommandGenerator.createDefaultGenerator(
                 inputTopicName,
@@ -1114,6 +1122,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         String inputTopicName = "persistent://public/default/test-neg-ack-" + runtime + "-input-" + randomName(8);
         String outputTopicName = "test-neg-ack-" + runtime + "-output-" + randomName(8);
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+            admin.topics().createNonPartitionedTopic(inputTopicName);
+            admin.topics().createNonPartitionedTopic(outputTopicName);
+        }
 
         String functionName = "test-neg-ack-fn-" + randomName(8);
         final int numMessages = 20;
@@ -1290,6 +1302,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8);
         String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8);
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+            admin.topics().createNonPartitionedTopic(inputTopicName);
+            admin.topics().createNonPartitionedTopic(outputTopicName);
+        }
 
         String functionName = "test-publish-fn-" + randomName(8);
         final int numMessages = 10;
@@ -1416,6 +1432,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
 
         String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8);
         String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8);
+        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) {
+            admin.topics().createNonPartitionedTopic(inputTopicName);
+            admin.topics().createNonPartitionedTopic(outputTopicName);
+        }
         if (isTopicPattern) {
             @Cleanup PulsarClient client = PulsarClient.builder()
                     .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -1928,6 +1948,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             .build();
 
         @Cleanup
+        PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build();
+        admin.topics().createNonPartitionedTopic(consumeTopicName);
+        admin.topics().createNonPartitionedTopic(outputTopicName);
+
+        @Cleanup
         Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes())
             .topic(consumeTopicName)
             .subscriptionName("debezium-source-tester")