You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/14 03:04:12 UTC

[pulsar] branch branch-2.7 updated: Fix that maxProducersPerTopic cannot be disabled at the namespace level (#9157)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 7209a5b  Fix that maxProducersPerTopic cannot be disabled at the namespace level (#9157)
7209a5b is described below

commit 7209a5bf8d2d9cb5f4ffae5fc67c636856c87782
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Mon Jan 11 11:55:07 2021 +0800

    Fix that maxProducersPerTopic cannot be disabled at the namespace level (#9157)
    
    Master Issue: #9146
    
    `maxProducersPerTopic` cannot be disabled at the namespace-level
    
    Let `maxProducersPerTopic` can be null and no longer uses broker-level policy as the default value
    
    AdminApiTest2#testMaxProducersPerTopicUnlimited
    
    (cherry picked from commit ab8802b51975ed525bda7a80f0af81665e2e7a29)
---
 .../apache/pulsar/broker/admin/AdminResource.java  |  3 --
 .../pulsar/broker/admin/impl/NamespacesBase.java   |  6 +--
 .../apache/pulsar/broker/admin/v2/Namespaces.java  | 14 +++++-
 .../pulsar/broker/service/AbstractTopic.java       |  3 +-
 .../apache/pulsar/broker/admin/AdminApiTest2.java  | 53 ++++++++++++++++++++++
 .../pulsar/broker/admin/TopicPoliciesTest.java     |  2 +-
 .../org/apache/pulsar/client/admin/Namespaces.java | 22 ++++++++-
 .../client/admin/internal/NamespacesImpl.java      | 24 +++++++++-
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  3 ++
 .../org/apache/pulsar/admin/cli/CmdNamespaces.java | 14 ++++++
 .../pulsar/common/policies/data/Policies.java      |  4 +-
 11 files changed, 135 insertions(+), 13 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 02181e1..e1fdea1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -491,9 +491,6 @@ public abstract class AdminResource extends PulsarWebResource {
         }
 
         final ServiceConfiguration config = pulsar().getConfiguration();
-        if (policies.max_producers_per_topic < 1) {
-            policies.max_producers_per_topic = config.getMaxProducersPerTopic();
-        }
 
         if (policies.max_consumers_per_topic < 1) {
             policies.max_consumers_per_topic = config.getMaxConsumersPerTopic();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2a142f9..87d0d2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2492,7 +2492,7 @@ public abstract class NamespacesBase extends AdminResource {
                         "specific limit. To disable retention both limits must be set to 0.");
     }
 
-    protected int internalGetMaxProducersPerTopic() {
+    protected Integer internalGetMaxProducersPerTopic() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_producers_per_topic;
     }
@@ -2510,7 +2510,7 @@ public abstract class NamespacesBase extends AdminResource {
         internalSetPolicies("deduplicationSnapshotIntervalSeconds", interval);
     }
 
-    protected void internalSetMaxProducersPerTopic(int maxProducersPerTopic) {
+    protected void internalSetMaxProducersPerTopic(Integer maxProducersPerTopic) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_PRODUCERS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
@@ -2519,7 +2519,7 @@ public abstract class NamespacesBase extends AdminResource {
             final String path = path(POLICIES, namespaceName.toString());
             byte[] content = globalZk().getData(path, null, nodeStat);
             Policies policies = jsonMapper().readValue(content, Policies.class);
-            if (maxProducersPerTopic < 0) {
+            if (maxProducersPerTopic != null && maxProducersPerTopic < 0) {
                 throw new RestException(Status.PRECONDITION_FAILED,
                         "maxProducersPerTopic must be 0 or more");
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 6befe4d..7351d5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -919,7 +919,7 @@ public class Namespaces extends NamespacesBase {
     @ApiOperation(value = "Get maxProducersPerTopic config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public int getMaxProducersPerTopic(@PathParam("tenant") String tenant,
+    public Integer getMaxProducersPerTopic(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
         return internalGetMaxProducersPerTopic();
@@ -938,6 +938,18 @@ public class Namespaces extends NamespacesBase {
         internalSetMaxProducersPerTopic(maxProducersPerTopic);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/maxProducersPerTopic")
+    @ApiOperation(value = "Remove maxProducersPerTopic configuration on a namespace.")
+    @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification") })
+    public void removeMaxProducersPerTopic(@PathParam("tenant") String tenant,
+                                               @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetMaxProducersPerTopic(null);
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/deduplicationSnapshotInterval")
     @ApiOperation(value = "Get deduplicationSnapshotInterval config on a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 429248a..2cab994 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -145,7 +145,8 @@ public abstract class AbstractTopic implements Topic {
             }
             maxProducers = policies.max_producers_per_topic;
         }
-        maxProducers = maxProducers > 0 ? maxProducers : brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
+        maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
+                .getConfiguration().getMaxProducersPerTopic();
         if (maxProducers > 0 && maxProducers <= producers.size()) {
             return true;
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 62eb997..70d2542 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -1605,5 +1605,58 @@ public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
         client.close();
     }
 
+    @Test
+    public void testMaxProducersPerTopicUnlimited() throws Exception {
+        final int maxProducersPerTopic = 1;
+        super.internalCleanup();
+        mockPulsarSetup.cleanup();
+        conf.setMaxProducersPerTopic(maxProducersPerTopic);
+        super.internalSetup();
+        //init namespace
+        admin.clusters().createCluster("test", new ClusterData(pulsar.getWebServiceAddress()));
+        TenantInfo tenantInfo = new TenantInfo(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
+        admin.tenants().createTenant("prop-xyz", tenantInfo);
+        final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
+        admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
+        final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited";
+        //the policy is set to 0, so there will be no restrictions
+        admin.namespaces().setMaxProducersPerTopic(myNamespace, 0);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+                -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0);
+        List<Producer<byte[]>> producers = new ArrayList<>();
+        for (int i = 0; i < maxProducersPerTopic + 1; i++) {
+            Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+            producers.add(producer);
+        }
+
+        admin.namespaces().removeMaxProducersPerTopic(myNamespace);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+                -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null);
+        try {
+            pulsarClient.newProducer().topic(topic).create();
+            fail("should fail");
+        } catch (PulsarClientException ignore) {
+            assertTrue(ignore.getMessage().contains("Topic reached max producers limit"));
+        }
+        //set the limit to 3
+        admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+                -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3);
+        // should success
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
+        producers.add(producer);
+        try {
+            pulsarClient.newProducer().topic(topic).create();
+            fail("should fail");
+        } catch (PulsarClientException ignore) {
+            assertTrue(ignore.getMessage().contains("Topic reached max producers limit"));
+        }
+
+        //clean up
+        for (Producer<byte[]> tempProducer : producers) {
+            tempProducer.close();
+        }
+    }
+
 
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 99cb773..aaba1bd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -472,7 +472,7 @@ public class TopicPoliciesTest extends MockedPulsarServiceBaseTest {
         admin.namespaces().setMaxProducersPerTopic(myNamespace, 3);
 
         Awaitility.await().atMost(3, TimeUnit.SECONDS)
-                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace), 3));
+                .untilAsserted(() -> Assert.assertEquals(admin.namespaces().getMaxProducersPerTopic(myNamespace).intValue(), 3));
 
         log.info("MaxProducers: {} will set to the namespace: {}", 3, myNamespace);
         try {
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index a4637c4..fcf2211 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2522,7 +2522,7 @@ public interface Namespaces {
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    int getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+    Integer getMaxProducersPerTopic(String namespace) throws PulsarAdminException;
 
     /**
      * Get the maxProducersPerTopic for a namespace asynchronously.
@@ -2578,6 +2578,26 @@ public interface Namespaces {
     CompletableFuture<Void> setMaxProducersPerTopicAsync(String namespace, int maxProducersPerTopic);
 
     /**
+     * Remove maxProducersPerTopic for a namespace.
+     * @param namespace Namespace name
+     *
+     * @throws NotAuthorizedException
+     *             Don't have admin permission
+     * @throws NotFoundException
+     *             Namespace does not exist
+     * @throws PulsarAdminException
+     *             Unexpected error
+     */
+    void removeMaxProducersPerTopic(String namespace) throws PulsarAdminException;
+
+    /**
+     * Set maxProducersPerTopic for a namespace asynchronously.
+     * @param namespace
+     *            Namespace name
+     */
+    CompletableFuture<Void> removeMaxProducersPerTopicAsync(String namespace);
+
+    /**
      * Get the maxProducersPerTopic for a namespace.
      * <p/>
      * Response example:
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index b8e1a67..ff428be 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2121,7 +2121,7 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
-    public int getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
+    public Integer getMaxProducersPerTopic(String namespace) throws PulsarAdminException {
         try {
             return getMaxProducersPerTopicAsync(namespace).
                     get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
@@ -2178,6 +2178,28 @@ public class NamespacesImpl extends BaseResource implements Namespaces {
     }
 
     @Override
+    public void removeMaxProducersPerTopic(String namespace) throws PulsarAdminException {
+        try {
+            removeMaxProducersPerTopicAsync(namespace).
+                    get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxProducersPerTopicAsync(String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxProducersPerTopic");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public int getMaxConsumersPerTopic(String namespace) throws PulsarAdminException {
         try {
             return getMaxConsumersPerTopicAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index dac618d..bdd46e6 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -471,6 +471,9 @@ public class PulsarAdminToolTest {
         namespaces.run(split("set-max-producers-per-topic myprop/clust/ns1 -p 1"));
         verify(mockNamespaces).setMaxProducersPerTopic("myprop/clust/ns1", 1);
 
+        namespaces.run(split("remove-max-producers-per-topic myprop/clust/ns1"));
+        verify(mockNamespaces).removeMaxProducersPerTopic("myprop/clust/ns1");
+
         namespaces.run(split("get-max-consumers-per-topic myprop/clust/ns1"));
         verify(mockNamespaces).getMaxConsumersPerTopic("myprop/clust/ns1");
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index d2052cd..418d255 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1253,6 +1253,18 @@ public class CmdNamespaces extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Remove max producers per topic for a namespace")
+    private class RemoveMaxProducersPerTopic extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            admin.namespaces().removeMaxProducersPerTopic(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Set maxProducersPerTopic for a namespace")
     private class SetMaxProducersPerTopic extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -1918,6 +1930,8 @@ public class CmdNamespaces extends CmdBase {
 
         jcommander.addCommand("get-max-producers-per-topic", new GetMaxProducersPerTopic());
         jcommander.addCommand("set-max-producers-per-topic", new SetMaxProducersPerTopic());
+        jcommander.addCommand("remove-max-producers-per-topic", new RemoveMaxProducersPerTopic());
+
         jcommander.addCommand("get-max-consumers-per-topic", new GetMaxConsumersPerTopic());
         jcommander.addCommand("set-max-consumers-per-topic", new SetMaxConsumersPerTopic());
         jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index a7ee4f7..e314ab5 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -77,7 +77,7 @@ public class Policies {
     public SubscriptionAuthMode subscription_auth_mode = SubscriptionAuthMode.None;
 
     @SuppressWarnings("checkstyle:MemberName")
-    public int max_producers_per_topic = 0;
+    public Integer max_producers_per_topic = null;
     @SuppressWarnings("checkstyle:MemberName")
     public int max_consumers_per_topic = 0;
     @SuppressWarnings("checkstyle:MemberName")
@@ -164,7 +164,7 @@ public class Policies {
                     && Objects.equals(delayed_delivery_policies, other.delayed_delivery_policies)
                     && Objects.equals(inactive_topic_policies, other.inactive_topic_policies)
                     && Objects.equals(subscription_auth_mode, other.subscription_auth_mode)
-                    && max_producers_per_topic == other.max_producers_per_topic
+                    && Objects.equals(max_producers_per_topic, other.max_producers_per_topic)
                     && max_consumers_per_topic == other.max_consumers_per_topic
                     && max_consumers_per_subscription == other.max_consumers_per_subscription
                     && max_unacked_messages_per_consumer == other.max_unacked_messages_per_consumer