You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/14 03:04:12 UTC
[pulsar] branch branch-2.7 updated: Fix that maxProducersPerTopic
cannot be disabled at the namespace level (#9157)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 7209a5b Fix that maxProducersPerTopic cannot be disabled at the namespace level (#9157)
7209a5b is described below
commit 7209a5bf8d2d9cb5f4ffae5fc67c636856c87782
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Mon Jan 11 11:55:07 2021 +0800
Fix that maxProducersPerTopic cannot be disabled at the namespace level (#9157)
Master Issue: #9146
`maxProducersPerTopic` cannot be disabled at the namespace-level
Let `maxProducersPerTopic` can be null and no longer uses broker-level policy as the default value
AdminApiTest2#testMaxProducersPerTopicUnlimited
(cherry picked from commit ab8802b51975ed525bda7a80f0af81665e2e7a29)
---
.../apache/pulsar/broker/admin/AdminResource.java | 3 --
.../pulsar/broker/admin/impl/NamespacesBase.java | 6 +--
.../apache/pulsar/broker/admin/v2/Namespaces.java | 14 +++++-
.../pulsar/broker/service/AbstractTopic.java | 3 +-
.../apache/pulsar/broker/admin/AdminApiTest2.java | 53 ++++++++++++++++++++++
.../pulsar/broker/admin/TopicPoliciesTest.java | 2 +-
.../org/apache/pulsar/client/admin/Namespaces.java | 22 ++++++++-
.../client/admin/internal/NamespacesImpl.java | 24 +++++++++-
.../pulsar/admin/cli/PulsarAdminToolTest.java | 3 ++
.../org/apache/pulsar/admin/cli/CmdNamespaces.java | 14 ++++++
.../pulsar/common/policies/data/Policies.java | 4 +-
11 files changed, 135 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 02181e1..e1fdea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -491,9 +491,6 @@ public abstract class AdminResource extends PulsarWebResource {
}
final ServiceConfiguration config = pulsar().getConfiguration();
- if (policies.max_producers_per_topic < 1) {
- policies.max_producers_per_topic = config.getMaxProducersPerTopic();
- }
if (policies.max_consumers_per_topic < 1) {
policies.max_consumers_per_topic = config.getMaxConsumersPerTopic();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2a142f9..87d0d2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2492,7 +2492,7 @@ public abstract class NamespacesBase extends AdminResource {
"specific limit. To disable retention both limits must be set to 0.");
}
- protected int internalGetMaxProducersPerTopic() {
+ protected Integer internalGetMaxProducersPerTopic() {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
return getNamespacePolicies(namespaceName).max_producers_per_topic;
}
@@ -2510,7 +2510,7 @@ public abstract class NamespacesBase extends AdminResource {
internalSetPolicies("deduplicationSnapshotIntervalSeconds", interval);
}
- protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
+ protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) {
validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
validatePoliciesReadOnlyAccess();
@@ -2519,7 +2519,7 @@ public abstract class NamespacesBase extends AdminResource {
final String path = path(POLICIES, namespaceName.toString());
byte[] content = globalZk().getData(path, null, nodeStat);
Policies policies = jsonMapper().readValue(content, Policies.class);
- if (maxProducersPerTopic < 0) {
+ if (maxProducersPerTopic != null && maxProducersPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxProducersPerTopic must be 0 or more");
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 6befe4d..7351d5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -919,7 +919,7 @@ public class Namespaces extends NamespacesBase {
@ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
- public int getMaxProducersPerTopic(@PathParam("tenant") String tenant,
+ public Integer getMaxProducersPerTopic(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
validateNamespaceName(tenant, namespace);
return internalGetMaxProducersPerTopic();
@@ -938,6 +938,18 @@ public class Namespaces extends NamespacesBase {
internalSetMaxProducersPerTopic(maxProducersPerTopic);
}
+ @DELETE
+ @Path("/{tenant}/{namespace}/maxProducersPerTopic")
+ @ApiOperation(value = "Remove maxProducersPerTopic configuration on a namespace.")
+ @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+ @ApiResponse(code = 404, message = "Namespace does not exist"),
+ @ApiResponse(code = 409, message = "Concurrent modification") })
+ public void removeMaxProducersPerTopic(@PathParam("tenant") String tenant,
+ @PathParam("namespace") String namespace) {
+ validateNamespaceName(tenant, namespace);
+ internalSetMaxProducersPerTopic(null);
+ }
+
@GET
@Path("/{tenant}/{namespace}/deduplicationSnapshotInterval")
@ApiOperation(value = "Get deduplicationSnapshotInterval config on a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 429248a..2cab994 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -145,7 +145,8 @@ public abstract class AbstractTopic implements Topic {
}
maxProducers = policies.max_producers_per_topic;
}
- maxProducers = maxProducers > 0 ? maxProducers : brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+ maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
+ .getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 62eb997..70d2542 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1605,5 +1605,58 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
client.close();
}
+ @Test
+ public void testMaxProducersPerTopicUnlimited() throws Exception {
+ final int maxProducersPerTopic = 1;
+ super.internalCleanup();
+ mockPulsarSetup.cleanup();
+ conf.setMaxProducersPerTopic(maxProducersPerTopic);
+ super.internalSetup();
+ //init namespace
+ admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+ TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+ admin.tenants().createTenant("prop-xyz", tenantInfo);
+ final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+ admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
+ final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
+ //the policy is set to 0, so there will be no restrictions
+ admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
+ List<Producer<byte[]>> producers = new ArrayList<>();
+ for (int i = 0; i < maxProducersPerTopic + 1; i++) {
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+ producers.add(producer);
+ }
+
+ admin.namespaces().removeMaxProducersPerTopic(myNamespace);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);
+ try {
+ pulsarClient.newProducer().topic(topic).create();
+ fail("should fail");
+ } catch (PulsarClientException ignore) {
+ assertTrue(ignore.getMessage().contains("Topic reached max producers limit"));
+ }
+ //set the limit to 3
+ admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+ -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
+ // should success
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+ producers.add(producer);
+ try {
+ pulsarClient.newProducer().topic(topic).create();
+ fail("should fail");
+ } catch (PulsarClientException ignore) {
+ assertTrue(ignore.getMessage().contains("Topic reached max producers limit"));
+ }
+
+ //clean up
+ for (Producer<byte[]> tempProducer : producers) {
+ tempProducer.close();
+ }
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 99cb773..aaba1bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -472,7 +472,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
Awaitility.await().atMost(3, TimeUnit.SECONDS)
- .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace), 3));
+ .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace).intValue(), 3));
log.info("MaxProducers: {} will set to the namespace: {}", 3, myNamespace);
try {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index a4637c4..fcf2211 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2522,7 +2522,7 @@ public interface Namespaces {
* @throws PulsarAdminException
* Unexpected error
*/
- int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+ Integer getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
/**
* Get the maxProducersPerTopic for a namespace asynchronously.
@@ -2578,6 +2578,26 @@ public interface Namespaces {
CompletableFuture<Void> setMaxProducersPerTopicAsync(String namespace, int maxProducersPerTopic);
/**
+ * Remove maxProducersPerTopic for a namespace.
+ * @param namespace Namespace name
+ *
+ * @throws NotAuthorizedException
+ * Don't have admin permission
+ * @throws NotFoundException
+ * Namespace does not exist
+ * @throws PulsarAdminException
+ * Unexpected error
+ */
+ void removeMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+
+ /**
+ * Set maxProducersPerTopic for a namespace asynchronously.
+ * @param namespace
+ * Namespace name
+ */
+ CompletableFuture<Void> removeMaxProducersPerTopicAsync(String namespace);
+
+ /**
* Get the maxProducersPerTopic for a namespace.
* <p/>
* Response example:
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index b8e1a67..ff428be 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2121,7 +2121,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
- public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
+ public Integer getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
try {
return getMaxProducersPerTopicAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
@@ -2178,6 +2178,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
}
@Override
+ public void removeMaxProducersPerTopic(String namespace) throws PulsarAdminException {
+ try {
+ removeMaxProducersPerTopicAsync(namespace).
+ get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+ } catch (ExecutionException e) {
+ throw (PulsarAdminException) e.getCause();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new PulsarAdminException(e);
+ } catch (TimeoutException e) {
+ throw new PulsarAdminException.TimeoutException(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> removeMaxProducersPerTopicAsync(String namespace) {
+ NamespaceName ns = NamespaceName.get(namespace);
+ WebTarget path = namespacePath(ns, "maxProducersPerTopic");
+ return asyncDeleteRequest(path);
+ }
+
+ @Override
public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
try {
return getMaxConsumersPerTopicAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index dac618d..bdd46e6 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -471,6 +471,9 @@ public class PulsarAdminToolTest {
namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p 1"));
verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1);
+ namespaces.run(split("remove-max-producers-per-topic myprop/clust/ns1"));
+ verify(mockNamespaces).removeMaxProducersPerTopic("myprop/clust/ns1");
+
namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1"));
verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1");
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d2052cd..418d255 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1253,6 +1253,18 @@ public class CmdNamespaces extends CmdBase {
}
}
+ @Parameters(commandDescription = "Remove max producers per topic for a namespace")
+ private class RemoveMaxProducersPerTopic extends CliCommand {
+ @Parameter(description = "tenant/namespace", required = true)
+ private java.util.List<String> params;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String namespace = validateNamespace(params);
+ admin.namespaces().removeMaxProducersPerTopic(namespace);
+ }
+ }
+
@Parameters(commandDescription = "Set maxProducersPerTopic for a namespace")
private class SetMaxProducersPerTopic extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
@@ -1918,6 +1930,8 @@ public class CmdNamespaces extends CmdBase {
jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
+ jcommander.addCommand("remove-max-producers-per-topic", new RemoveMaxProducersPerTopic());
+
jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
jcommander.addCommand("set-max-consumers-per-topic", new SetMaxConsumersPerTopic());
jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a7ee4f7..e314ab5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -77,7 +77,7 @@ public class Policies {
public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
@SuppressWarnings("checkstyle:MemberName")
- public int max_producers_per_topic = 0;
+ public Integer max_producers_per_topic = null;
@SuppressWarnings("checkstyle:MemberName")
public int max_consumers_per_topic = 0;
@SuppressWarnings("checkstyle:MemberName")
@@ -164,7 +164,7 @@ public class Policies {
&& Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies)
&& Objects.equals(inactive_topic_policies, other.inactive_topic_policies)
&& Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
- && max_producers_per_topic == other.max_producers_per_topic
+ && Objects.equals(max_producers_per_topic, other.max_producers_per_topic)
&& max_consumers_per_topic == other.max_consumers_per_topic
&& max_consumers_per_subscription == other.max_consumers_per_subscription
&& max_unacked_messages_per_consumer == other.max_unacked_messages_per_consumer