You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/08 12:34:10 UTC

[pulsar] 24/38: [Issue #5395][broker] Implement AutoTopicCreation by namespace override (#6471)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1c029455b6c6d32f305901d19db76a98b777a031
Author: Kai <kl...@toasttab.com>
AuthorDate: Wed Mar 25 23:59:43 2020 -0700

    [Issue #5395][broker] Implement AutoTopicCreation by namespace override (#6471)
    
    Fixes #5395
    
    This change introduces a new namespace policy `autoTopicCreationOverride`, which will enable an override of broker `autoTopicCreation` settings on the namespace level. You may keep `autoTopicCreation` disabled for the broker and allow it on a specific namespace using this feature.
    
    - Add new namespace policy: `autoTopicCreationOverride` and associated API / CLI interface for setting and removing. Defaults to non-partitioned type, but also allows partitioned topics.
    - Modifies BrokerService: when checking `autoTopicCreation` configuration, the broker first retrieves namespace policies from zookeeper. If the `autoTopicCreationOverride` policy exists for that namespace then it uses those settings. If not, falls back to broker configuration.
    - Slight refactor to move `TopicType` enum to pulsar-common and add `autoTopicCreationOverride` to pulsar-common.
    (cherry picked from commit fdc3a9bc8f04c4e424fec90a636a4aa25b35dcd8)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |  15 +-
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 106 ++++++++++-
 .../apache/pulsar/broker/admin/v2/Namespaces.java  |  38 ++++
 .../pulsar/broker/service/BrokerService.java       |  59 +++++-
 .../apache/pulsar/broker/service/ServerCnx.java    |   2 +-
 .../broker/auth/MockedPulsarServiceBaseTest.java   |   2 +-
 .../BrokerServiceAutoTopicCreationTest.java        | 199 ++++++++++++++++++---
 .../org/apache/pulsar/client/admin/Namespaces.java |  59 +++++-
 .../client/admin/internal/NamespacesImpl.java      |  29 ++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |   9 +
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java |  59 ++++++
 .../policies/data/AutoTopicCreationOverride.java   |  89 +++++++++
 .../pulsar/common/policies/data/Policies.java      |   6 +-
 .../pulsar/common/policies/data/TopicType.java     |  45 +++++
 .../data/AutoTopicCreationOverrideTest.java        |  63 +++++++
 15 files changed, 729 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6ad8b46..8d9a2b7 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -35,6 +35,7 @@ import lombok.Setter;
 import org.apache.bookkeeper.client.api.DigestType;
 import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.configuration.Category;
 import org.apache.pulsar.common.configuration.FieldContext;
@@ -1533,18 +1534,4 @@ public class ServiceConfiguration implements PulsarConfiguration {
             return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;
         }
     }
-
-    enum TopicType {
-        PARTITIONED("partitioned"),
-        NON_PARTITIONED("non-partitioned");
-        private String type;
-
-        TopicType(String type) {
-            this.type = type;
-        }
-
-        public String toString() {
-            return type;
-        }
-    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 01927f2..535361e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -560,6 +561,105 @@ public abstract class NamespacesBase extends AdminResource {
         }
     }
 
+    protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
+            throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
+        }
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoTopicCreationOverride = autoTopicCreationOverride;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                            asyncResponse.resume(Response.noContent().build());
+                            log.info("[{}] Successfully {} on namespace {}", clientAppId(),
+                                    autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
+                            return null;
+                        } catch (KeeperException.NoNodeException e) {
+                            log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
+                                    namespaceName);
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                            return null;
+                        } catch (KeeperException.BadVersionException e) {
+                            log.error(
+                                    "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
+                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                            return null;
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
+                            return null;
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
+    protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+
+        // Force to read the data s.t. the watch to the cache content is setup.
+        policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+                policies -> {
+                    if (policies.isPresent()) {
+                        Entry<Policies, Stat> policiesNode = policies.get();
+                        policiesNode.getKey().autoTopicCreationOverride = null;
+                        try {
+                            // Write back the new policies into zookeeper
+                            globalZk().setData(path(POLICIES, namespaceName.toString()),
+                                    jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+                            policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+                            asyncResponse.resume(Response.noContent().build());
+                            log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName);
+                            return null;
+                        } catch (KeeperException.NoNodeException e) {
+                            log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
+                                    namespaceName);
+                            asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+                            return null;
+                        } catch (KeeperException.BadVersionException e) {
+                            log.error(
+                                    "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
+                                    clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+
+                            asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+                            return null;
+                        } catch (Exception e) {
+                            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+                            asyncResponse.resume(new RestException(e));
+                            return null;
+                        }
+                    } else {
+                        asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+                        return null;
+                    }
+                }
+        ).exceptionally(e -> {
+            log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+            asyncResponse.resume(new RestException(e));
+            return null;
+        });
+    }
+
     protected void internalModifyDeduplication(boolean enableDeduplication) {
         validateAdminAccessForTenant(namespaceName.getTenant());
         validatePoliciesReadOnlyAccess();
@@ -580,17 +680,17 @@ public abstract class NamespacesBase extends AdminResource {
             log.info("[{}] Successfully {} on namespace {}", clientAppId(),
                     enableDeduplication ? "enabled" : "disabled", namespaceName);
         } catch (KeeperException.NoNodeException e) {
-            log.warn("[{}] Failed to modify deplication status for namespace {}: does not exist", clientAppId(),
+            log.warn("[{}] Failed to modify deduplication status for namespace {}: does not exist", clientAppId(),
                     namespaceName);
             throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
         } catch (KeeperException.BadVersionException e) {
             log.warn(
-                    "[{}] Failed to modify deplication status on namespace {} expected policy node version={} : concurrent modification",
+                    "[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification",
                     clientAppId(), namespaceName, policiesNode.getValue().getVersion());
 
             throw new RestException(Status.CONFLICT, "Concurrent modification");
         } catch (Exception e) {
-            log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e);
+            log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e);
             throw new RestException(e);
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 3dedcf6..26867e7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -45,6 +45,7 @@ import javax.ws.rs.core.MediaType;
 import org.apache.pulsar.broker.admin.impl.NamespacesBase;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -295,6 +296,43 @@ public class Namespaces extends NamespacesBase {
         internalModifyDeduplication(enableDeduplication);
     }
 
+    @POST
+    @Path("/{tenant}/{namespace}/autoTopicCreation")
+    @ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+            @ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
+    public void setAutoTopicCreation(
+            @Suspended final AsyncResponse asyncResponse,
+                                                  @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+                                                  AutoTopicCreationOverride autoTopicCreationOverride) {
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e ) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
+    @DELETE
+    @Path("/{tenant}/{namespace}/autoTopicCreation")
+    @ApiOperation(value = "Remove override of broker's allowAutoTopicCreation in a namespace")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
+    public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
+                                        @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+        try {
+            validateNamespaceName(tenant, namespace);
+            internalRemoveAutoTopicCreation(asyncResponse);
+        } catch (RestException e) {
+            asyncResponse.resume(e);
+        } catch (Exception e) {
+            asyncResponse.resume(new RestException(e));
+        }
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/bundles")
     @ApiOperation(value = "Get the bundles split data.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 3f0f9cf..ffd75b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -123,6 +123,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.LocalPolicies;
 import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -132,6 +133,7 @@ import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.stats.Metrics;
 import org.apache.pulsar.common.util.FieldParser;
 import org.apache.pulsar.common.util.FutureUtil;
@@ -671,7 +673,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
     }
 
     public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
-        return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get);
+        return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
     }
 
     public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
@@ -1861,8 +1863,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                                 // If topic is already exist, creating partitioned topic is not allowed.
                                 if (metadata.partitions == 0
                                         && !topicExists
-                                        && pulsar.getConfiguration().isAllowAutoTopicCreation()
-                                        && pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
+                                        && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+                                        && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
                                     return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
                                 } else {
                                     return CompletableFuture.completedFuture(metadata);
@@ -1873,7 +1875,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
 
     @SuppressWarnings("deprecation")
     private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
-        int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+        int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
         checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
 
         PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
@@ -2106,4 +2108,53 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
         foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
         return currentMessagePublishBufferBytes.get();
     }
+
+    public boolean isAllowAutoTopicCreation(final String topic) {
+        TopicName topicName = TopicName.get(topic);
+        return isAllowAutoTopicCreation(topicName);
+    }
+
+    public boolean isAllowAutoTopicCreation(final TopicName topicName) {
+        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+        if (autoTopicCreationOverride != null) {
+            return autoTopicCreationOverride.allowAutoTopicCreation;
+        } else {
+            return pulsar.getConfiguration().isAllowAutoTopicCreation();
+        }
+    }
+
+    public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
+        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+        if (autoTopicCreationOverride != null) {
+            return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.topicType);
+        } else {
+            return pulsar.getConfiguration().isDefaultTopicTypePartitioned();
+        }
+    }
+
+    public int getDefaultNumPartitions(final TopicName topicName) {
+        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+        if (autoTopicCreationOverride != null) {
+            return autoTopicCreationOverride.defaultNumPartitions;
+        } else {
+            return pulsar.getConfiguration().getDefaultNumPartitions();
+        }
+    }
+
+    private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
+        try {
+            Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
+                            .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+            // If namespace policies have the field set, it will override the broker-level setting
+            if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
+                return policies.get().autoTopicCreationOverride;
+            }
+        } catch (Throwable t) {
+            // Ignoring since if we don't have policies, we fallback on the default
+            log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};", topicName, t.getMessage(), t);
+            return null;
+        }
+        log.warn("No autoTopicCreateOverride policy found for {}", topicName);
+        return null;
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 0e99529..ce8411e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -803,7 +803,7 @@ public class ServerCnx extends PulsarHandler {
                         }
 
                         boolean createTopicIfDoesNotExist = forceTopicCreation
-                                && service.pulsar().getConfig().isAllowAutoTopicCreation();
+                                && service.isAllowAutoTopicCreation(topicName.toString());
 
                         service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
                                 .thenCompose(optTopic -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index a47e3eb..7b1d8d3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -96,7 +96,7 @@ public abstract class MockedPulsarServiceBaseTest {
         this.conf.setDefaultNumberOfNamespaceBundles(1);
         this.conf.setZookeeperServers("localhost:2181");
         this.conf.setConfigurationStoreServers("localhost:3181");
-        this.conf.setAllowAutoTopicCreationType("non-persistent");
+        this.conf.setAllowAutoTopicCreationType("non-partitioned");
         this.conf.setBrokerServicePort(Optional.of(0));
         this.conf.setBrokerServicePortTls(Optional.of(0));
         this.conf.setWebServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index 3f404af..275ec3f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -24,8 +24,11 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -43,17 +46,35 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
         super.internalCleanup();
     }
 
+    @AfterMethod
+    protected void cleanupTest() throws Exception {
+        pulsar.getAdminClient().namespaces().removeAutoTopicCreation("prop/ns-abc");
+    }
+
+
     @Test
     public void testAutoNonPartitionedTopicCreation() throws Exception{
         pulsar.getConfiguration().setAllowAutoTopicCreation(true);
         pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
 
-        final String topicName = "persistent://prop/ns-abc/non-partitioned-topic";
+        final String topicString = "persistent://prop/ns-abc/non-partitioned-topic";
         final String subscriptionName = "non-partitioned-topic-sub";
-        pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
 
-        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
-        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+    }
+
+    @Test
+    public void testAutoNonPartitionedTopicCreationOnProduce() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+
+        final String topicString = "persistent://prop/ns-abc/non-partitioned-topic-2";
+        pulsarClient.newProducer().topic(topicString).create();
+
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
     }
 
     @Test
@@ -62,13 +83,28 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
         pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
         pulsar.getConfiguration().setDefaultNumPartitions(3);
 
-        final String topicName = "persistent://prop/ns-abc/partitioned-topic";
+        final String topicString = "persistent://prop/ns-abc/partitioned-topic";
         final String subscriptionName = "partitioned-topic-sub";
-        pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
 
-        assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+        assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
         for (int i = 0; i < 3; i++) {
-            assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+            assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+        }
+    }
+
+    @Test
+    public void testAutoPartitionedTopicCreationOnProduce() throws Exception{
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+        final String topicString = "persistent://prop/ns-abc/partitioned-topic-1";
+        pulsarClient.newProducer().topic(topicString).create();
+
+        assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+        for (int i = 0; i < 3; i++) {
+            assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
         }
     }
 
@@ -76,15 +112,15 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
     public void testAutoTopicCreationDisable() throws Exception{
         pulsar.getConfiguration().setAllowAutoTopicCreation(false);
 
-        final String topicName = "persistent://prop/ns-abc/test-topic";
+        final String topicString = "persistent://prop/ns-abc/test-topic";
         final String subscriptionName = "test-topic-sub";
         try {
-            pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+            pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
             fail("Subscribe operation should have failed");
         } catch (Exception e) {
             assertTrue(e instanceof PulsarClientException);
         }
-        assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+        assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
     }
 
     @Test
@@ -93,16 +129,16 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
         pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
         pulsar.getConfiguration().setDefaultNumPartitions(3);
 
-        final String topicName = "persistent://prop/ns-abc/test-topic-2";
+        final String topicString = "persistent://prop/ns-abc/test-topic-2";
         final String subscriptionName = "partitioned-topic-sub";
-        admin.topics().createNonPartitionedTopic(topicName);
-        pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+        admin.topics().createNonPartitionedTopic(topicString);
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
 
-        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
         for (int i = 0; i < 3; i++) {
-            assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+            assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
         }
-        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
     }
 
     /**
@@ -116,10 +152,131 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
         pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
         pulsar.getConfiguration().setDefaultNumPartitions(3);
 
-        final String topicName = "persistent://prop/ns-abc/test-topic-3";
-        int partitions = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
+        final String topicString = "persistent://prop/ns-abc/test-topic-3";
+        int partitions = admin.topics().getPartitionedTopicMetadata(topicString).partitions;
         assertEquals(partitions, 0);
-        assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+        assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+    }
+
+    @Test
+    public void testAutoCreationNamespaceAllowOverridesBroker() throws Exception {
+        final String topicString = "persistent://prop/ns-abc/test-topic-4";
+        final String subscriptionName = "test-topic-sub-4";
+        final TopicName topicName = TopicName.get(topicString);
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+                new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+    }
+
+    @Test
+    public void testAutoCreationNamespaceDisallowOverridesBroker() throws Exception {
+        final String topicString = "persistent://prop/ns-abc/test-topic-5";
+        final String subscriptionName = "test-topic-sub-5";
+        final TopicName topicName = TopicName.get(topicString);
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+                new AutoTopicCreationOverride(false, null, null));
+
+        try {
+            pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+            fail("Subscribe operation should have failed");
+        } catch (Exception e) {
+            assertTrue(e instanceof PulsarClientException);
+        }
+        assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+    }
+
+    @Test
+    public void testAutoCreationNamespaceOverrideAllowsPartitionedTopics() throws Exception {
+        final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-6";
+        final TopicName topicName = TopicName.get(topicString);
+
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+                new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 4));
+
+        final String subscriptionName = "test-topic-sub-6";
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+        assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+        for (int i = 0; i < 4; i++) {
+            assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+        }
+    }
+
+    @Test
+    public void testAutoCreationNamespaceOverridesTopicTypePartitioned() throws Exception {
+        final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-7";
+        final TopicName topicName = TopicName.get(topicString);
+
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+        pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+                new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 3));
+
+        final String subscriptionName = "test-topic-sub-7";
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+        assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+        for (int i = 0; i < 3; i++) {
+            assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+        }
+    }
+
+    @Test
+    public void testAutoCreationNamespaceOverridesTopicTypeNonPartitioned() throws Exception {
+        final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-8";
+        final TopicName topicName = TopicName.get(topicString);
+
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(2);
+        pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+                new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+        final String subscriptionName = "test-topic-sub-8";
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+    }
+
+    @Test
+    public void testAutoCreationNamespaceOverridesDefaultNumPartitions() throws Exception {
+        final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-9";
+        final TopicName topicName = TopicName.get(topicString);
+
+        pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+        pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+        pulsar.getConfiguration().setDefaultNumPartitions(2);
+        pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+                new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 4));
+
+        final String subscriptionName = "test-topic-sub-9";
+
+        pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+        assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+        for (int i = 0; i < 4; i++) {
+            assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+        }
+    }
+
+    @Test
+    public void testAutoCreationNamespaceAllowOverridesBrokerOnProduce() throws Exception {
+        final String topicString = "persistent://prop/ns-abc/test-topic-10";
+        final TopicName topicName = TopicName.get(topicString);
+        pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+        pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+                new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+        pulsarClient.newProducer().topic(topicString).create();
+        assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+        assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
     }
 
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index bce5aba5..a1f7ec1 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
@@ -606,6 +607,56 @@ public interface Namespaces {
     void setDeduplicationStatus(String namespace, boolean enableDeduplication) throws PulsarAdminException;
 
     /**
+     * Sets the autoTopicCreation policy for a given namespace, overriding broker settings
+     * <p/>
+     * When autoTopicCreationOverride is enabled, new topics will be created upon connection,
+     * regardless of the broker level configuration.
+     * <p/>
+     * Request example:
+     *
+     * <pre>
+     * <code>
+     *  {
+     *      "allowAutoTopicCreation" : true,
+     *      "topicType" : "partitioned",
+     *      "defaultNumPartitions": 2
+     *  }
+     * </code>
+     * </pre>
+     *
+     * @param namespace
+     *            Namespace name
+     * @param autoTopicCreationOverride
+     *            Override policies for auto topic creation
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void setAutoTopicCreation(String namespace, AutoTopicCreationOverride autoTopicCreationOverride)
+            throws PulsarAdminException;
+
+    /**
+     * Removes the autoTopicCreation policy for a given namespace,
+     * allowing the broker to dictate the auto-creation policy.
+     * <p/>
+     *
+     * @param namespace
+     *            Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void removeAutoTopicCreation(String namespace) throws PulsarAdminException;
+
+    /**
      * Get the bundles split data.
      *
      * @param namespace
@@ -784,7 +835,7 @@ public interface Namespaces {
     /**
      * Set bookie affinity group for a namespace to isolate namespace write to bookies that are part of given affinity
      * group.
-     * 
+     *
      * @param namespace
      *            namespace name
      * @param bookieAffinityGroup
@@ -793,10 +844,10 @@ public interface Namespaces {
      */
     void setBookieAffinityGroup(String namespace, BookieAffinityGroupData bookieAffinityGroup)
             throws PulsarAdminException;
-    
+
     /**
      * Delete bookie affinity group configured for a namespace.
-     * 
+     *
      * @param namespace
      * @throws PulsarAdminException
      */
@@ -804,7 +855,7 @@ public interface Namespaces {
 
     /**
      * Get bookie affinity group configured for a namespace.
-     * 
+     *
      * @param namespace
      * @return
      * @throws PulsarAdminException
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index c21b601..a7d7b47 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -373,6 +374,30 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void setAutoTopicCreation(String namespace,
+                                     AutoTopicCreationOverride autoTopicCreationOverride) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "autoTopicCreation");
+            request(path).post(Entity.entity(autoTopicCreationOverride,
+                    MediaType.APPLICATION_JSON), ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void removeAutoTopicCreation(String namespace) throws PulsarAdminException {
+        try {
+            NamespaceName ns = NamespaceName.get(namespace);
+            WebTarget path = namespacePath(ns, "autoTopicCreation");
+            request(path).delete(ErrorData.class);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
     public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String namespace) throws PulsarAdminException {
         try {
             NamespaceName ns = NamespaceName.get(namespace);
@@ -550,7 +575,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
         } catch (Exception e) {
             throw getApiException(e);
         }
-    
+
     }
 
     @Override
@@ -563,7 +588,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
             throw getApiException(e);
         }
     }
-    
+
     @Override
     public void setDispatchRate(String namespace, DispatchRate dispatchRate) throws PulsarAdminException {
         try {
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 9318f15..0a9d119 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
@@ -72,6 +73,7 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
@@ -355,6 +357,13 @@ public class PulsarAdminToolTest {
         namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
         verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", true);
 
+        namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t non-partitioned"));
+        verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1",
+                new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+        namespaces.run(split("remove-auto-topic-creation myprop/clust/ns1"));
+        verify(mockNamespaces).removeAutoTopicCreation("myprop/clust/ns1");
+
         namespaces.run(split("get-message-ttl myprop/clust/ns1"));
         verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index 3dd4a6f..0de2f74 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.admin.cli.utils.IOUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
 import org.apache.pulsar.common.policies.data.BundlesData;
@@ -51,6 +52,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
 import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.TopicType;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 
 @Parameters(commandDescription = "Operations about namespaces")
@@ -378,6 +380,60 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Enable or disable autoTopicCreation for a namespace, overriding broker settings")
+    private class SetAutoTopicCreation extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable allowAutoTopicCreation on namespace")
+        private boolean enable = false;
+
+        @Parameter(names = { "--disable", "-d" }, description = "Disable allowAutoTopicCreation on namespace")
+        private boolean disable = false;
+
+        @Parameter(names = { "--type", "-t" }, description = "Type of topic to be auto-created. " +
+                "Possible values: (partitioned, non-partitioned). Default value: non-partitioned")
+        private String type = "non-partitioned";
+
+        @Parameter(names = { "--num-partitions", "-n" }, description = "Default number of partitions of topic to be auto-created," +
+                " applicable to partitioned topics only", required = false)
+        private Integer defaultNumPartitions = null;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            type = type.toLowerCase().trim();
+
+            if (enable == disable) {
+                throw new ParameterException("Need to specify either --enable or --disable");
+            }
+            if (enable) {
+                if (!TopicType.isValidTopicType(type)) {
+                    throw new ParameterException("Must specify type of topic to be created. " +
+                            "Possible values: (partitioned, non-partitioned)");
+                }
+
+                if (TopicType.PARTITIONED.toString().equals(type) && !(defaultNumPartitions > 0)) {
+                    throw new ParameterException("Must specify num-partitions > 0 for partitioned topic type.");
+                }
+            }
+            admin.namespaces().setAutoTopicCreation(namespace, new AutoTopicCreationOverride(enable, type, defaultNumPartitions));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove override of autoTopicCreation for a namespace")
+    private class RemoveAutoTopicCreation extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+
+            admin.namespaces().removeAutoTopicCreation(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Set the retention policy for a namespace")
     private class SetRetention extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -1486,6 +1542,9 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("set-deduplication", new SetDeduplication());
 
+        jcommander.addCommand("set-auto-topic-creation", new SetAutoTopicCreation());
+        jcommander.addCommand("remove-auto-topic-creation", new RemoveAutoTopicCreation());
+
         jcommander.addCommand("get-retention", new GetRetention());
         jcommander.addCommand("set-retention", new SetRetention());
         
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java
new file mode 100644
index 0000000..3eb7a49
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * Override of autoTopicCreation settings on a namespace level.
+ */
+public class AutoTopicCreationOverride {
+    public boolean allowAutoTopicCreation;
+    public String topicType;
+    public Integer defaultNumPartitions;
+
+    public AutoTopicCreationOverride() {
+    }
+
+    public AutoTopicCreationOverride(boolean allowAutoTopicCreation, String topicType,
+                                     Integer defaultNumPartitions) {
+        this.allowAutoTopicCreation = allowAutoTopicCreation;
+        this.topicType = topicType;
+        this.defaultNumPartitions = defaultNumPartitions;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(allowAutoTopicCreation, topicType, defaultNumPartitions);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof AutoTopicCreationOverride) {
+            AutoTopicCreationOverride other = (AutoTopicCreationOverride) obj;
+            return Objects.equals(this.allowAutoTopicCreation, other.allowAutoTopicCreation)
+                    && Objects.equals(this.topicType, other.topicType)
+                    && Objects.equals(this.defaultNumPartitions, other.defaultNumPartitions);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this).add("allowAutoTopicCreation", allowAutoTopicCreation)
+                .add("topicType", topicType).add("defaultNumPartitions", defaultNumPartitions).toString();
+    }
+
+    public static boolean isValidOverride(AutoTopicCreationOverride override) {
+        if (override == null) {
+            return false;
+        }
+        if (override.allowAutoTopicCreation) {
+            if (!TopicType.isValidTopicType(override.topicType)) {
+                return false;
+            }
+            if (TopicType.PARTITIONED.toString().equals(override.topicType)) {
+                if (override.defaultNumPartitions == null) {
+                    return false;
+                }
+                if (!(override.defaultNumPartitions > 0)) {
+                    return false;
+                }
+            } else if (TopicType.NON_PARTITIONED.toString().equals(override.topicType)) {
+                if (override.defaultNumPartitions != null) {
+                    return false;
+                }
+            }
+        }
+        return true;
+    }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index fae2ae2..7e56666 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -49,6 +49,8 @@ public class Policies {
 
     // If set, it will override the broker settings for enabling deduplication
     public Boolean deduplicationEnabled = null;
+    // If set, it will override the broker settings for allowing auto topic creation
+    public AutoTopicCreationOverride autoTopicCreationOverride = null;
     public Map<String, PublishRate> publishMaxMessageRate = Maps.newHashMap();
 
     @SuppressWarnings("checkstyle:MemberName")
@@ -106,7 +108,7 @@ public class Policies {
         return Objects.hash(auth_policies, replication_clusters,
                 backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
                 topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate,
-                clusterSubscribeRate, deduplicationEnabled, persistence,
+                clusterSubscribeRate, deduplicationEnabled, autoTopicCreationOverride, persistence,
                 bundles, latency_stats_sample_rate,
                 message_ttl_in_seconds, retention_policies,
                 encryption_required, delayed_delivery_policies,
@@ -136,6 +138,7 @@ public class Policies {
                     && Objects.equals(clusterSubscribeRate, other.clusterSubscribeRate)
                     && Objects.equals(publishMaxMessageRate, other.publishMaxMessageRate)
                     && Objects.equals(deduplicationEnabled, other.deduplicationEnabled)
+                    && Objects.equals(autoTopicCreationOverride, other.autoTopicCreationOverride)
                     && Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles)
                     && Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate)
                     && Objects.equals(message_ttl_in_seconds,
@@ -183,6 +186,7 @@ public class Policies {
                 .add("replication_clusters", replication_clusters).add("bundles", bundles)
                 .add("backlog_quota_map", backlog_quota_map).add("persistence", persistence)
                 .add("deduplicationEnabled", deduplicationEnabled)
+                .add("autoTopicCreationOverride", autoTopicCreationOverride)
                 .add("clusterDispatchRate", clusterDispatchRate)
                 .add("topicDispatchRate", topicDispatchRate)
                 .add("subscriptionDispatchRate", subscriptionDispatchRate)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicType.java
new file mode 100644
index 0000000..b184bf6
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicType.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Topic types -- partitioned or non-partitioned.
+ */
+public enum TopicType {
+    PARTITIONED("partitioned"),
+    NON_PARTITIONED("non-partitioned");
+    private String type;
+
+    TopicType(String type) {
+        this.type = type;
+    }
+
+    public String toString() {
+        return type;
+    }
+
+    public static boolean isValidTopicType(String type) {
+        for (TopicType topicType : TopicType.values()) {
+            if (topicType.toString().equalsIgnoreCase(type)) {
+                return true;
+            }
+        }
+        return false;
+    }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java
new file mode 100644
index 0000000..75bc110
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class AutoTopicCreationOverrideTest {
+
+    @Test
+    public void testValidOverrideNonPartitioned() {
+        AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null);
+        assertTrue(AutoTopicCreationOverride.isValidOverride(override));
+    }
+
+    @Test
+    public void testValidOverridePartitioned() {
+        AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 2);
+        assertTrue(AutoTopicCreationOverride.isValidOverride(override));
+    }
+
+    @Test
+    public void testInvalidTopicType() {
+        AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, "aaa", null);
+        assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+    }
+
+    @Test
+    public void testNumPartitionsTooLow() {
+        AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 0);
+        assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+    }
+
+    @Test
+    public void testNumPartitionsNotSet() {
+        AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), null);
+        assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+    }
+
+    @Test
+    public void testNumPartitionsOnNonPartitioned() {
+        AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), 2);
+        assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+    }
+}