You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/03/26 06:59:54 UTC
[pulsar] branch master updated: [Issue #5395][broker] Implement
AutoTopicCreation by namespace override (#6471)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fdc3a9b [Issue #5395][broker] Implement AutoTopicCreation by namespace override (#6471)
fdc3a9b is described below
commit fdc3a9bc8f04c4e424fec90a636a4aa25b35dcd8
Author: Kai <kl...@toasttab.com>
AuthorDate: Wed Mar 25 23:59:43 2020 -0700
[Issue #5395][broker] Implement AutoTopicCreation by namespace override (#6471)
Fixes #5395
### Motivation
This change introduces a new namespace policy `autoTopicCreationOverride`, which will enable an override of broker `autoTopicCreation` settings on the namespace level. You may keep `autoTopicCreation` disabled for the broker and allow it on a specific namespace using this feature.
### Modifications
- Add new namespace policy: `autoTopicCreationOverride` and associated API / CLI interface for setting and removing. Defaults to non-partitioned type, but also allows partitioned topics.
- Modifies BrokerService: when checking `autoTopicCreation` configuration, the broker first retrieves namespace policies from zookeeper. If the `autoTopicCreationOverride` policy exists for that namespace then it uses those settings. If not, falls back to broker configuration.
- Slight refactor to move `TopicType` enum to pulsar-common and add `autoTopicCreationOverride` to pulsar-common.
---
.../apache/pulsar/broker/ServiceConfiguration.java | 15 +-
.../pulsar/broker/admin/impl/NamespacesBase.java | 106 ++++++++++-
.../apache/pulsar/broker/admin/v2/Namespaces.java | 38 ++++
.../pulsar/broker/service/BrokerService.java | 59 +++++-
.../apache/pulsar/broker/service/ServerCnx.java | 2 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 2 +-
.../BrokerServiceAutoTopicCreationTest.java | 199 ++++++++++++++++++---
.../org/apache/pulsar/client/admin/Namespaces.java | 51 ++++++
.../client/admin/internal/NamespacesImpl.java | 25 +++
.../pulsar/admin/cli/PulsarAdminToolTest.java | 9 +
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 59 ++++++
.../policies/data/AutoTopicCreationOverride.java | 89 +++++++++
.../pulsar/common/policies/data/Policies.java | 6 +-
.../pulsar/common/policies/data/TopicType.java | 45 +++++
.../data/AutoTopicCreationOverrideTest.java | 63 +++++++
15 files changed, 723 insertions(+), 45 deletions(-)
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9d2d79d..190170d 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -35,6 +35,7 @@ import lombok.Setter;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.configuration.Category;
import org.apache.pulsar.common.configuration.FieldContext;
@@ -1538,18 +1539,4 @@ public class ServiceConfiguration implements PulsarConfiguration {
return brokerDeleteInactiveTopicsMaxInactiveDurationSeconds;
}
}
-
- enum TopicType {
- PARTITIONED("partitioned"),
- NON_PARTITIONED("non-partitioned");
- private String type;
-
- TopicType(String type) {
- this.type = type;
- }
-
- public String toString() {
- return type;
- }
- }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index fbb5895..a4a485f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -553,6 +554,105 @@ public abstract class NamespacesBase extends AdminResource {
}
}
+ protected void internalSetAutoTopicCreation(AsyncResponse asyncResponse, AutoTopicCreationOverride autoTopicCreationOverride) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+
+ if (!AutoTopicCreationOverride.isValidOverride(autoTopicCreationOverride)) {
+ throw new RestException(Status.PRECONDITION_FAILED, "Invalid configuration for autoTopicCreationOverride");
+ }
+
+ // Force to read the data s.t. the watch to the cache content is setup.
+ policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+ policies -> {
+ if (policies.isPresent()) {
+ Entry<Policies, Stat> policiesNode = policies.get();
+ policiesNode.getKey().autoTopicCreationOverride = autoTopicCreationOverride;
+ try {
+ // Write back the new policies into zookeeper
+ globalZk().setData(path(POLICIES, namespaceName.toString()),
+ jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ asyncResponse.resume(Response.noContent().build());
+ log.info("[{}] Successfully {} on namespace {}", clientAppId(),
+ autoTopicCreationOverride.allowAutoTopicCreation ? "enabled" : "disabled", namespaceName);
+ return null;
+ } catch (KeeperException.NoNodeException e) {
+ log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ return null;
+ } catch (KeeperException.BadVersionException e) {
+ log.error(
+ "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
+ clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ return null;
+ } catch (Exception e) {
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return null;
+ }
+ } else {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ return null;
+ }
+ }
+ ).exceptionally(e -> {
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return null;
+ });
+ }
+
+ protected void internalRemoveAutoTopicCreation(AsyncResponse asyncResponse) {
+ validateAdminAccessForTenant(namespaceName.getTenant());
+ validatePoliciesReadOnlyAccess();
+
+ // Force to read the data s.t. the watch to the cache content is setup.
+ policiesCache().getWithStatAsync(path(POLICIES, namespaceName.toString())).thenApply(
+ policies -> {
+ if (policies.isPresent()) {
+ Entry<Policies, Stat> policiesNode = policies.get();
+ policiesNode.getKey().autoTopicCreationOverride = null;
+ try {
+ // Write back the new policies into zookeeper
+ globalZk().setData(path(POLICIES, namespaceName.toString()),
+ jsonMapper().writeValueAsBytes(policiesNode.getKey()), policiesNode.getValue().getVersion());
+ policiesCache().invalidate(path(POLICIES, namespaceName.toString()));
+ asyncResponse.resume(Response.noContent().build());
+ log.info("[{}] Successfully removed override on namespace {}", clientAppId(), namespaceName);
+ return null;
+ } catch (KeeperException.NoNodeException e) {
+ log.error("[{}] Failed to modify autoTopicCreation status for namespace {}: does not exist", clientAppId(),
+ namespaceName);
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
+ return null;
+ } catch (KeeperException.BadVersionException e) {
+ log.error(
+ "[{}] Failed to modify autoTopicCreation status on namespace {} expected policy node version={} : concurrent modification",
+ clientAppId(), namespaceName, policiesNode.getValue().getVersion());
+
+ asyncResponse.resume(new RestException(Status.CONFLICT, "Concurrent modification"));
+ return null;
+ } catch (Exception e) {
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return null;
+ }
+ } else {
+ asyncResponse.resume(new RestException(Status.NOT_FOUND, "Namespace " + namespaceName + " does not exist"));
+ return null;
+ }
+ }
+ ).exceptionally(e -> {
+ log.error("[{}] Failed to modify autoTopicCreation status on namespace {}", clientAppId(), namespaceName, e);
+ asyncResponse.resume(new RestException(e));
+ return null;
+ });
+ }
+
protected void internalModifyDeduplication(boolean enableDeduplication) {
validateAdminAccessForTenant(namespaceName.getTenant());
validatePoliciesReadOnlyAccess();
@@ -573,17 +673,17 @@ public abstract class NamespacesBase extends AdminResource {
log.info("[{}] Successfully {} on namespace {}", clientAppId(),
enableDeduplication ? "enabled" : "disabled", namespaceName);
} catch (KeeperException.NoNodeException e) {
- log.warn("[{}] Failed to modify deplication status for namespace {}: does not exist", clientAppId(),
+ log.warn("[{}] Failed to modify deduplication status for namespace {}: does not exist", clientAppId(),
namespaceName);
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
} catch (KeeperException.BadVersionException e) {
log.warn(
- "[{}] Failed to modify deplication status on namespace {} expected policy node version={} : concurrent modification",
+ "[{}] Failed to modify deduplication status on namespace {} expected policy node version={} : concurrent modification",
clientAppId(), namespaceName, policiesNode.getValue().getVersion());
throw new RestException(Status.CONFLICT, "Concurrent modification");
} catch (Exception e) {
- log.error("[{}] Failed to modify deplication status on namespace {}", clientAppId(), namespaceName, e);
+ log.error("[{}] Failed to modify deduplication status on namespace {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index cf2022e..68a16dc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -45,6 +45,7 @@ import javax.ws.rs.core.MediaType;
import org.apache.pulsar.broker.admin.impl.NamespacesBase;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
@@ -299,6 +300,43 @@ public class Namespaces extends NamespacesBase {
internalModifyDeduplication(enableDeduplication);
}
+ @POST
+ @Path("/{tenant}/{namespace}/autoTopicCreation")
+ @ApiOperation(value = "Override broker's allowAutoTopicCreation setting for a namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist"),
+ @ApiResponse(code = 400, message = "Invalid autoTopicCreation override") })
+ public void setAutoTopicCreation(
+ @Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
+ AutoTopicCreationOverride autoTopicCreationOverride) {
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalSetAutoTopicCreation(asyncResponse, autoTopicCreationOverride);
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e ) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/autoTopicCreation")
+ @ApiOperation(value = "Remove override of broker's allowAutoTopicCreation in a namespace")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Tenant or cluster or namespace doesn't exist") })
+ public void removeAutoTopicCreation(@Suspended final AsyncResponse asyncResponse,
+ @PathParam("tenant") String tenant, @PathParam("namespace") String namespace) {
+ try {
+ validateNamespaceName(tenant, namespace);
+ internalRemoveAutoTopicCreation(asyncResponse);
+ } catch (RestException e) {
+ asyncResponse.resume(e);
+ } catch (Exception e) {
+ asyncResponse.resume(new RestException(e));
+ }
+ }
+
@GET
@Path("/{tenant}/{namespace}/bundles")
@ApiOperation(value = "Get the bundles split data.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 21d2686..12ceb3b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -122,6 +122,7 @@ import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.LocalPolicies;
import org.apache.pulsar.common.policies.data.OffloadPolicies;
@@ -131,6 +132,7 @@ import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.common.util.FieldParser;
import org.apache.pulsar.common.util.FutureUtil;
@@ -658,7 +660,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
public CompletableFuture<Topic> getOrCreateTopic(final String topic) {
- return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get);
+ return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get);
}
public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) {
@@ -1846,8 +1848,8 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
// If topic is already exist, creating partitioned topic is not allowed.
if (metadata.partitions == 0
&& !topicExists
- && pulsar.getConfiguration().isAllowAutoTopicCreation()
- && pulsar.getConfiguration().isDefaultTopicTypePartitioned()) {
+ && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+ && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
} else {
return CompletableFuture.completedFuture(metadata);
@@ -1858,7 +1860,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
- int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
+ int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0");
PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions);
@@ -2091,4 +2093,53 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
return currentMessagePublishBufferBytes.get();
}
+
+ public boolean isAllowAutoTopicCreation(final String topic) {
+ TopicName topicName = TopicName.get(topic);
+ return isAllowAutoTopicCreation(topicName);
+ }
+
+ public boolean isAllowAutoTopicCreation(final TopicName topicName) {
+ AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+ if (autoTopicCreationOverride != null) {
+ return autoTopicCreationOverride.allowAutoTopicCreation;
+ } else {
+ return pulsar.getConfiguration().isAllowAutoTopicCreation();
+ }
+ }
+
+ public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
+ AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+ if (autoTopicCreationOverride != null) {
+ return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.topicType);
+ } else {
+ return pulsar.getConfiguration().isDefaultTopicTypePartitioned();
+ }
+ }
+
+ public int getDefaultNumPartitions(final TopicName topicName) {
+ AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+ if (autoTopicCreationOverride != null) {
+ return autoTopicCreationOverride.defaultNumPartitions;
+ } else {
+ return pulsar.getConfiguration().getDefaultNumPartitions();
+ }
+ }
+
+ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
+ try {
+ Optional<Policies> policies = pulsar.getConfigurationCache().policiesCache()
+ .get(AdminResource.path(POLICIES, topicName.getNamespace()));
+ // If namespace policies have the field set, it will override the broker-level setting
+ if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
+ return policies.get().autoTopicCreationOverride;
+ }
+ } catch (Throwable t) {
+ // Ignoring since if we don't have policies, we fallback on the default
+ log.warn("Got exception when reading autoTopicCreateOverride policy for {}: {};", topicName, t.getMessage(), t);
+ return null;
+ }
+ log.warn("No autoTopicCreateOverride policy found for {}", topicName);
+ return null;
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index f8f5758..98970e8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -805,7 +805,7 @@ public class ServerCnx extends PulsarHandler {
}
boolean createTopicIfDoesNotExist = forceTopicCreation
- && service.pulsar().getConfig().isAllowAutoTopicCreation();
+ && service.isAllowAutoTopicCreation(topicName.toString());
service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
.thenCompose(optTopic -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 7b3b040..6fe83b8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -101,7 +101,7 @@ public abstract class MockedPulsarServiceBaseTest {
this.conf.setDefaultNumberOfNamespaceBundles(1);
this.conf.setZookeeperServers("localhost:2181");
this.conf.setConfigurationStoreServers("localhost:3181");
- this.conf.setAllowAutoTopicCreationType("non-persistent");
+ this.conf.setAllowAutoTopicCreationType("non-partitioned");
this.conf.setBrokerServicePort(Optional.of(0));
this.conf.setBrokerServicePortTls(Optional.of(0));
this.conf.setWebServicePort(Optional.of(0));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index d595a64..05b6f03 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -24,8 +24,11 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
-
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -43,17 +46,35 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
super.internalCleanup();
}
+ @AfterMethod
+ protected void cleanupTest() throws Exception {
+ pulsar.getAdminClient().namespaces().removeAutoTopicCreation("prop/ns-abc");
+ }
+
+
@Test
public void testAutoNonPartitionedTopicCreation() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(true);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
- final String topicName = "persistent://prop/ns-abc/non-partitioned-topic";
+ final String topicString = "persistent://prop/ns-abc/non-partitioned-topic";
final String subscriptionName = "non-partitioned-topic-sub";
- pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
- assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
- assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+ }
+
+ @Test
+ public void testAutoNonPartitionedTopicCreationOnProduce() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+
+ final String topicString = "persistent://prop/ns-abc/non-partitioned-topic-2";
+ pulsarClient.newProducer().topic(topicString).create();
+
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
@Test
@@ -62,13 +83,28 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
- final String topicName = "persistent://prop/ns-abc/partitioned-topic";
+ final String topicString = "persistent://prop/ns-abc/partitioned-topic";
final String subscriptionName = "partitioned-topic-sub";
- pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
- assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+ assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 3; i++) {
- assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+ }
+ }
+
+ @Test
+ public void testAutoPartitionedTopicCreationOnProduce() throws Exception{
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setDefaultNumPartitions(3);
+
+ final String topicString = "persistent://prop/ns-abc/partitioned-topic-1";
+ pulsarClient.newProducer().topic(topicString).create();
+
+ assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+ for (int i = 0; i < 3; i++) {
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
}
@@ -76,15 +112,15 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
public void testAutoTopicCreationDisable() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
- final String topicName = "persistent://prop/ns-abc/test-topic";
+ final String topicString = "persistent://prop/ns-abc/test-topic";
final String subscriptionName = "test-topic-sub";
try {
- pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
fail("Subscribe operation should have failed");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException);
}
- assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+ assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}
@Test
@@ -93,16 +129,16 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
- final String topicName = "persistent://prop/ns-abc/test-topic-2";
+ final String topicString = "persistent://prop/ns-abc/test-topic-2";
final String subscriptionName = "partitioned-topic-sub";
- admin.topics().createNonPartitionedTopic(topicName);
- pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+ admin.topics().createNonPartitionedTopic(topicString);
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
- assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
for (int i = 0; i < 3; i++) {
- assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i));
+ assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
}
- assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}
@Test
@@ -158,9 +194,130 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
pulsar.getConfiguration().setDefaultNumPartitions(3);
- final String topicName = "persistent://prop/ns-abc/test-topic-3";
- int partitions = admin.topics().getPartitionedTopicMetadata(topicName).partitions;
+ final String topicString = "persistent://prop/ns-abc/test-topic-3";
+ int partitions = admin.topics().getPartitionedTopicMetadata(topicString).partitions;
assertEquals(partitions, 0);
- assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
+ assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ }
+
+ @Test
+ public void testAutoCreationNamespaceAllowOverridesBroker() throws Exception {
+ final String topicString = "persistent://prop/ns-abc/test-topic-4";
+ final String subscriptionName = "test-topic-sub-4";
+ final TopicName topicName = TopicName.get(topicString);
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+ new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+ }
+
+ @Test
+ public void testAutoCreationNamespaceDisallowOverridesBroker() throws Exception {
+ final String topicString = "persistent://prop/ns-abc/test-topic-5";
+ final String subscriptionName = "test-topic-sub-5";
+ final TopicName topicName = TopicName.get(topicString);
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+ new AutoTopicCreationOverride(false, null, null));
+
+ try {
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+ fail("Subscribe operation should have failed");
+ } catch (Exception e) {
+ assertTrue(e instanceof PulsarClientException);
+ }
+ assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ }
+
+ @Test
+ public void testAutoCreationNamespaceOverrideAllowsPartitionedTopics() throws Exception {
+ final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-6";
+ final TopicName topicName = TopicName.get(topicString);
+
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+ new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 4));
+
+ final String subscriptionName = "test-topic-sub-6";
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+ assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+ for (int i = 0; i < 4; i++) {
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+ }
+ }
+
+ @Test
+ public void testAutoCreationNamespaceOverridesTopicTypePartitioned() throws Exception {
+ final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-7";
+ final TopicName topicName = TopicName.get(topicString);
+
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+ new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 3));
+
+ final String subscriptionName = "test-topic-sub-7";
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+ assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+ for (int i = 0; i < 3; i++) {
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+ }
+ }
+
+ @Test
+ public void testAutoCreationNamespaceOverridesTopicTypeNonPartitioned() throws Exception {
+ final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-8";
+ final TopicName topicName = TopicName.get(topicString);
+
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setDefaultNumPartitions(2);
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+ new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+ final String subscriptionName = "test-topic-sub-8";
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+ }
+
+ @Test
+ public void testAutoCreationNamespaceOverridesDefaultNumPartitions() throws Exception {
+ final String topicString = "persistent://prop/ns-abc/partitioned-test-topic-9";
+ final TopicName topicName = TopicName.get(topicString);
+
+ pulsar.getConfiguration().setAllowAutoTopicCreation(true);
+ pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
+ pulsar.getConfiguration().setDefaultNumPartitions(2);
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+ new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 4));
+
+ final String subscriptionName = "test-topic-sub-9";
+
+ pulsarClient.newConsumer().topic(topicString).subscriptionName(subscriptionName).subscribe();
+
+ assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
+ for (int i = 0; i < 4; i++) {
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString + "-partition-" + i));
+ }
+ }
+
+ @Test
+ public void testAutoCreationNamespaceAllowOverridesBrokerOnProduce() throws Exception {
+ final String topicString = "persistent://prop/ns-abc/test-topic-10";
+ final TopicName topicName = TopicName.get(topicString);
+ pulsar.getConfiguration().setAllowAutoTopicCreation(false);
+ pulsar.getAdminClient().namespaces().setAutoTopicCreation(topicName.getNamespace(),
+ new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+ pulsarClient.newProducer().topic(topicString).create();
+ assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
+ assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}
}
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 305ac59..e4ca78a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -29,6 +29,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedExceptio
import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
@@ -937,6 +938,56 @@ public interface Namespaces {
CompletableFuture<Void> setDeduplicationStatusAsync(String namespace, boolean enableDeduplication);
/**
+ * Sets the autoTopicCreation policy for a given namespace, overriding broker settings
+ * <p/>
+ * When autoTopicCreationOverride is enabled, new topics will be created upon connection,
+ * regardless of the broker level configuration.
+ * <p/>
+ * Request example:
+ *
+ * <pre>
+ * <code>
+ * {
+ * "allowAutoTopicCreation" : true,
+ * "topicType" : "partitioned",
+ * "defaultNumPartitions": 2
+ * }
+ * </code>
+ * </pre>
+ *
+ * @param namespace
+ * Namespace name
+ * @param autoTopicCreationOverride
+ * Override policies for auto topic creation
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void setAutoTopicCreation(String namespace, AutoTopicCreationOverride autoTopicCreationOverride)
+ throws PulsarAdminException;
+
+ /**
+ * Removes the autoTopicCreation policy for a given namespace,
+ * allowing the broker to dictate the auto-creation policy.
+ * <p/>
+ *
+ * @param namespace
+ * Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeAutoTopicCreation(String namespace) throws PulsarAdminException;
+
+ /**
* Get the bundles split data.
*
* @param namespace
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index 2a021c2..34bb91e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
@@ -738,6 +739,30 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void setAutoTopicCreation(String namespace,
+ AutoTopicCreationOverride autoTopicCreationOverride) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "autoTopicCreation");
+ request(path).post(Entity.entity(autoTopicCreationOverride,
+ MediaType.APPLICATION_JSON), ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
+ public void removeAutoTopicCreation(String namespace) throws PulsarAdminException {
+ try {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "autoTopicCreation");
+ request(path).delete(ErrorData.class);
+ } catch (Exception e) {
+ throw getApiException(e);
+ }
+ }
+
+ @Override
public Map<BacklogQuotaType, BacklogQuota> getBacklogQuotaMap(String namespace) throws PulsarAdminException {
try {
return getBacklogQuotaMapAsync(namespace).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 814df85..c67f60a 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.policies.data.AuthAction;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BacklogQuota.RetentionPolicy;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
@@ -72,6 +73,7 @@ import org.apache.pulsar.common.policies.data.ResourceQuota;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
@@ -355,6 +357,13 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-deduplication myprop/clust/ns1 --enable"));
verify(mockNamespaces).setDeduplicationStatus("myprop/clust/ns1", true);
+ namespaces.run(split("set-auto-topic-creation myprop/clust/ns1 -e -t non-partitioned"));
+ verify(mockNamespaces).setAutoTopicCreation("myprop/clust/ns1",
+ new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null));
+
+ namespaces.run(split("remove-auto-topic-creation myprop/clust/ns1"));
+ verify(mockNamespaces).removeAutoTopicCreation("myprop/clust/ns1");
+
namespaces.run(split("get-message-ttl myprop/clust/ns1"));
verify(mockNamespaces).getNamespaceMessageTTL("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d77b8ec..59587ee 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.admin.cli.utils.IOUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.BookieAffinityGroupData;
import org.apache.pulsar.common.policies.data.BundlesData;
@@ -51,6 +52,7 @@ import org.apache.pulsar.common.policies.data.SchemaAutoUpdateCompatibilityStrat
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionAuthMode;
+import org.apache.pulsar.common.policies.data.TopicType;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations about namespaces")
@@ -378,6 +380,60 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Enable or disable autoTopicCreation for a namespace, overriding broker settings")
+ private class SetAutoTopicCreation extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable", "-e" }, description = "Enable allowAutoTopicCreation on namespace")
+ private boolean enable = false;
+
+ @Parameter(names = { "--disable", "-d" }, description = "Disable allowAutoTopicCreation on namespace")
+ private boolean disable = false;
+
+ @Parameter(names = { "--type", "-t" }, description = "Type of topic to be auto-created. " +
+ "Possible values: (partitioned, non-partitioned). Default value: non-partitioned")
+ private String type = "non-partitioned";
+
+ @Parameter(names = { "--num-partitions", "-n" }, description = "Default number of partitions of topic to be auto-created," +
+ " applicable to partitioned topics only", required = false)
+ private Integer defaultNumPartitions = null;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ type = type.toLowerCase().trim();
+
+ if (enable == disable) {
+ throw new ParameterException("Need to specify either --enable or --disable");
+ }
+ if (enable) {
+ if (!TopicType.isValidTopicType(type)) {
+ throw new ParameterException("Must specify type of topic to be created. " +
+ "Possible values: (partitioned, non-partitioned)");
+ }
+
+ if (TopicType.PARTITIONED.toString().equals(type) && !(defaultNumPartitions > 0)) {
+ throw new ParameterException("Must specify num-partitions > 0 for partitioned topic type.");
+ }
+ }
+ admin.namespaces().setAutoTopicCreation(namespace, new AutoTopicCreationOverride(enable, type, defaultNumPartitions));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove override of autoTopicCreation for a namespace")
+ private class RemoveAutoTopicCreation extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+
+ admin.namespaces().removeAutoTopicCreation(namespace);
+ }
+ }
+
@Parameters(commandDescription = "Set the retention policy for a namespace")
private class SetRetention extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1509,6 +1565,9 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("set-deduplication", new SetDeduplication());
+ jcommander.addCommand("set-auto-topic-creation", new SetAutoTopicCreation());
+ jcommander.addCommand("remove-auto-topic-creation", new RemoveAutoTopicCreation());
+
jcommander.addCommand("get-retention", new GetRetention());
jcommander.addCommand("set-retention", new SetRetention());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java
new file mode 100644
index 0000000..3eb7a49
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverride.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * Override of autoTopicCreation settings on a namespace level.
+ */
+public class AutoTopicCreationOverride {
+ public boolean allowAutoTopicCreation;
+ public String topicType;
+ public Integer defaultNumPartitions;
+
+ public AutoTopicCreationOverride() {
+ }
+
+ public AutoTopicCreationOverride(boolean allowAutoTopicCreation, String topicType,
+ Integer defaultNumPartitions) {
+ this.allowAutoTopicCreation = allowAutoTopicCreation;
+ this.topicType = topicType;
+ this.defaultNumPartitions = defaultNumPartitions;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(allowAutoTopicCreation, topicType, defaultNumPartitions);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof AutoTopicCreationOverride) {
+ AutoTopicCreationOverride other = (AutoTopicCreationOverride) obj;
+ return Objects.equals(this.allowAutoTopicCreation, other.allowAutoTopicCreation)
+ && Objects.equals(this.topicType, other.topicType)
+ && Objects.equals(this.defaultNumPartitions, other.defaultNumPartitions);
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this).add("allowAutoTopicCreation", allowAutoTopicCreation)
+ .add("topicType", topicType).add("defaultNumPartitions", defaultNumPartitions).toString();
+ }
+
+ public static boolean isValidOverride(AutoTopicCreationOverride override) {
+ if (override == null) {
+ return false;
+ }
+ if (override.allowAutoTopicCreation) {
+ if (!TopicType.isValidTopicType(override.topicType)) {
+ return false;
+ }
+ if (TopicType.PARTITIONED.toString().equals(override.topicType)) {
+ if (override.defaultNumPartitions == null) {
+ return false;
+ }
+ if (!(override.defaultNumPartitions > 0)) {
+ return false;
+ }
+ } else if (TopicType.NON_PARTITIONED.toString().equals(override.topicType)) {
+ if (override.defaultNumPartitions != null) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 2a9e043..8bf7222 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -49,6 +49,8 @@ public class Policies {
// If set, it will override the broker settings for enabling deduplication
public Boolean deduplicationEnabled = null;
+ // If set, it will override the broker settings for allowing auto topic creation
+ public AutoTopicCreationOverride autoTopicCreationOverride = null;
public Map<String, PublishRate> publishMaxMessageRate = Maps.newHashMap();
@SuppressWarnings("checkstyle:MemberName")
@@ -110,7 +112,7 @@ public class Policies {
return Objects.hash(auth_policies, replication_clusters,
backlog_quota_map, publishMaxMessageRate, clusterDispatchRate,
topicDispatchRate, subscriptionDispatchRate, replicatorDispatchRate,
- clusterSubscribeRate, deduplicationEnabled, persistence,
+ clusterSubscribeRate, deduplicationEnabled, autoTopicCreationOverride, persistence,
bundles, latency_stats_sample_rate,
message_ttl_in_seconds, retention_policies,
encryption_required, delayed_delivery_policies,
@@ -141,6 +143,7 @@ public class Policies {
&& Objects.equals(clusterSubscribeRate, other.clusterSubscribeRate)
&& Objects.equals(publishMaxMessageRate, other.publishMaxMessageRate)
&& Objects.equals(deduplicationEnabled, other.deduplicationEnabled)
+ && Objects.equals(autoTopicCreationOverride, other.autoTopicCreationOverride)
&& Objects.equals(persistence, other.persistence) && Objects.equals(bundles, other.bundles)
&& Objects.equals(latency_stats_sample_rate, other.latency_stats_sample_rate)
&& Objects.equals(message_ttl_in_seconds,
@@ -190,6 +193,7 @@ public class Policies {
.add("replication_clusters", replication_clusters).add("bundles", bundles)
.add("backlog_quota_map", backlog_quota_map).add("persistence", persistence)
.add("deduplicationEnabled", deduplicationEnabled)
+ .add("autoTopicCreationOverride", autoTopicCreationOverride)
.add("clusterDispatchRate", clusterDispatchRate)
.add("topicDispatchRate", topicDispatchRate)
.add("subscriptionDispatchRate", subscriptionDispatchRate)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicType.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicType.java
new file mode 100644
index 0000000..b184bf6
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicType.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+/**
+ * Topic types -- partitioned or non-partitioned.
+ */
+public enum TopicType {
+ PARTITIONED("partitioned"),
+ NON_PARTITIONED("non-partitioned");
+ private String type;
+
+ TopicType(String type) {
+ this.type = type;
+ }
+
+ public String toString() {
+ return type;
+ }
+
+ public static boolean isValidTopicType(String type) {
+ for (TopicType topicType : TopicType.values()) {
+ if (topicType.toString().equalsIgnoreCase(type)) {
+ return true;
+ }
+ }
+ return false;
+ }
+}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java
new file mode 100644
index 0000000..75bc110
--- /dev/null
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/policies/data/AutoTopicCreationOverrideTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.policies.data;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+public class AutoTopicCreationOverrideTest {
+
+ @Test
+ public void testValidOverrideNonPartitioned() {
+ AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), null);
+ assertTrue(AutoTopicCreationOverride.isValidOverride(override));
+ }
+
+ @Test
+ public void testValidOverridePartitioned() {
+ AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 2);
+ assertTrue(AutoTopicCreationOverride.isValidOverride(override));
+ }
+
+ @Test
+ public void testInvalidTopicType() {
+ AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, "aaa", null);
+ assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+ }
+
+ @Test
+ public void testNumPartitionsTooLow() {
+ AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), 0);
+ assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+ }
+
+ @Test
+ public void testNumPartitionsNotSet() {
+ AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.PARTITIONED.toString(), null);
+ assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+ }
+
+ @Test
+ public void testNumPartitionsOnNonPartitioned() {
+ AutoTopicCreationOverride override = new AutoTopicCreationOverride(true, TopicType.NON_PARTITIONED.toString(), 2);
+ assertFalse(AutoTopicCreationOverride.isValidOverride(override));
+ }
+}