You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by li...@apache.org on 2021/12/28 03:22:37 UTC

[pulsar] branch master updated: Max consumers per subscription support cross multiple clusters (#13522)

This is an automated email from the ASF dual-hosted git repository.

linlin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bc45b5  Max consumers per subscription support cross multiple clusters (#13522)
0bc45b5 is described below

commit 0bc45b54f29c5ffa1918e49aa4a341f70679162a
Author: feynmanlin <fe...@tencent.com>
AuthorDate: Tue Dec 28 11:21:32 2021 +0800

    Max consumers per subscription support cross multiple clusters (#13522)
    
    Max consumers per subscription support cross multiple clusters
---
 .../broker/admin/impl/PersistentTopicsBase.java    | 15 +++---
 .../pulsar/broker/admin/v2/PersistentTopics.java   |  9 ++--
 .../service/ReplicatorTopicPoliciesTest.java       | 26 +++++++++++
 .../pulsar/admin/cli/PulsarAdminToolTest.java      | 14 ++++++
 .../apache/pulsar/admin/cli/CmdTopicPolicies.java  | 54 ++++++++++++++++++++++
 .../org/apache/pulsar/admin/cli/CmdTopics.java     |  4 ++
 6 files changed, 113 insertions(+), 9 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 09afe9f..980ecd5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4178,30 +4178,33 @@ public class PersistentTopicsBase extends AdminResource {
     }
 
 
-    protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Optional<Integer>> internalGetMaxConsumersPerSubscription(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
                 .thenApply(op -> op.map(TopicPolicies::getMaxConsumersPerSubscription));
     }
 
-    protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
+    protected CompletableFuture<Void> internalSetMaxConsumersPerSubscription(
+            Integer maxConsumersPerSubscription, boolean isGlobal) {
         if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for maxConsumersPerSubscription");
         }
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setMaxConsumersPerSubscription(maxConsumersPerSubscription);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
 
-    protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalRemoveMaxConsumersPerSubscription(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 if (!op.isPresent()) {
                     return CompletableFuture.completedFuture(null);
                 }
                 op.get().setMaxConsumersPerSubscription(null);
+                op.get().setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 80a8ba2..fc95965 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3064,11 +3064,12 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetMaxConsumersPerSubscription())
+            .thenCompose(__ -> internalGetMaxConsumersPerSubscription(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
             .exceptionally(ex -> {
@@ -3090,12 +3091,13 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") int maxConsumersPerSubscription) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription))
+            .thenCompose(__ -> internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription, isGlobal))
             .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic max consumers per subscription:"
@@ -3126,11 +3128,12 @@ public class PersistentTopics extends PersistentTopicsBase {
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalRemoveMaxConsumersPerSubscription())
+            .thenCompose(__ -> internalRemoveMaxConsumersPerSubscription(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic max consumers per subscription:"
                                 + " tenant={}, namespace={}, topic={}",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index a1b37b2..c33c3d8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -173,6 +173,32 @@ public class ReplicatorTopicPoliciesTest extends ReplicatorTestBase {
     }
 
     @Test
+    public void testReplicatorMaxConsumerPerSubPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+
+        init(namespace, topic);
+        // set max consumer per sub
+        admin1.topicPolicies(true).setMaxConsumersPerSubscription(topic, 100);
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100));
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100));
+
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(admin1.topicPolicies(true).getMaxConsumersPerSubscription(topic).intValue(), 100);
+            assertNull(admin1.topicPolicies().getMaxConsumersPerSubscription(topic));
+        });
+
+        //remove max consumer per sub
+        admin1.topicPolicies(true).removeMaxConsumersPerSubscription(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getMaxConsumersPerSubscription(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getMaxConsumersPerSubscription(topic)));
+    }
+
+    @Test
     public void testReplicatorTopicPolicies() throws Exception {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 67c78f9..40086ee 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -948,6 +948,13 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -c 5"));
+        verify(mockTopicsPolicies).setMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1", 5);
+        cmdTopics.run(split("remove-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
@@ -1034,6 +1041,13 @@ public class PulsarAdminToolTest {
         cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -c 5 -g"));
+        verify(mockGlobalTopicsPolicies).setMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1", 5);
+        cmdTopics.run(split("remove-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("set-subscription-types-enabled persistent://myprop/clust/ns1/ds1 -t Shared,Failover -g"));
         verify(mockGlobalTopicsPolicies).setSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1",
                 Sets.newHashSet(SubscriptionType.Shared, SubscriptionType.Failover));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 5b98a86..8be0fb3 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -46,6 +46,9 @@ public class CmdTopicPolicies extends CmdBase {
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
         jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
+        jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
+        jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
+        jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
         jcommander.addCommand("set-subscription-types-enabled", new SetSubscriptionTypesEnabled());
         jcommander.addCommand("get-subscription-types-enabled", new GetSubscriptionTypesEnabled());
         jcommander.addCommand("remove-subscription-types-enabled", new RemoveSubscriptionTypesEnabled());
@@ -77,6 +80,57 @@ public class CmdTopicPolicies extends CmdBase {
         jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
     }
 
+    @Parameters(commandDescription = "Get max consumers per subscription for a topic")
+    private class GetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getMaxConsumersPerSubscription(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set max consumers per subscription for a topic")
+    private class SetMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--max-consumers-per-subscription", "-c" }, description = "maxConsumersPerSubscription for a namespace", required = true)
+        private int maxConsumersPerSubscription;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).setMaxConsumersPerSubscription(persistentTopic, maxConsumersPerSubscription);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max consumers per subscription for a topic")
+    private class RemoveMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeMaxConsumersPerSubscription(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get the message TTL for a topic")
     private class GetMessageTTL extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 47e013b..53b45ea 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -262,6 +262,10 @@ public class CmdTopics extends CmdBase {
             cmdUsageFormatter.addDeprecatedCommand("set-message-ttl");
             cmdUsageFormatter.addDeprecatedCommand("remove-message-ttl");
 
+            cmdUsageFormatter.addDeprecatedCommand("get-max-consumers-per-subscription");
+            cmdUsageFormatter.addDeprecatedCommand("set-max-consumers-per-subscription");
+            cmdUsageFormatter.addDeprecatedCommand("remove-max-consumers-per-subscription");
+
             cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-consumer");
             cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-consumer");
             cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-consumer");