You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/09/03 11:36:58 UTC

[GitHub] [pulsar] zhanghaou opened a new pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

zhanghaou opened a new pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968


   Fix [https://github.com/apache/pulsar/issues/7759](https://github.com/apache/pulsar/issues/7759), and master issue [https://github.com/apache/pulsar/issues/2688](https://github.com/apache/pulsar/issues/2688)
   
   ### Motivation
   
   Support set/get/remove maxConsumers on a topic level.
   
   ### Verifying this change
   new unit test added.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r484409041



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -153,6 +153,9 @@ public CmdTopics(PulsarAdmin admin) {
         jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
         jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
         jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+        jcommander.addCommand("get-maxConsumers", new GetMaxConsumers());
+        jcommander.addCommand("set-maxConsumers", new SetMaxConsumers());
+        jcommander.addCommand("remove-maxConsumers", new RemoveMaxConsumers());

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r484378934



##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -153,6 +153,9 @@ public CmdTopics(PulsarAdmin admin) {
         jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
         jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
         jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+        jcommander.addCommand("get-maxConsumers", new GetMaxConsumers());
+        jcommander.addCommand("set-maxConsumers", new SetMaxConsumers());
+        jcommander.addCommand("remove-maxConsumers", new RemoveMaxConsumers());

Review comment:
       And please also update the max producers as the above pattern.

##########
File path: pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
##########
@@ -153,6 +153,9 @@ public CmdTopics(PulsarAdmin admin) {
         jcommander.addCommand("get-inactive-topic-policies", new GetInactiveTopicPolicies());
         jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
         jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+        jcommander.addCommand("get-maxConsumers", new GetMaxConsumers());
+        jcommander.addCommand("set-maxConsumers", new SetMaxConsumers());
+        jcommander.addCommand("remove-maxConsumers", new RemoveMaxConsumers());

Review comment:
       ```suggestion
           jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
           jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
           jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r483587668



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -590,4 +591,121 @@ public void testRemovePublishRate() throws Exception {
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
+
+    @Test
+    public void testCheckMaxConsumers() throws Exception {
+        Integer maxProducers = new Integer(-1);
+        log.info("MaxConsumers: {} will set to the topic: {}", maxProducers, testTopic);
+        try {
+            admin.topics().setMaxConsumers(testTopic, maxProducers);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testSetMaxConsumers() throws Exception {
+        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1);
+        log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace);
+        Integer maxConsumers = 2;
+        log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic);
+        admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
+        Thread.sleep(3000);
+
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
+        Consumer consumer1 = null;
+        Consumer consumer2 = null;
+        Consumer consumer3 = null;
+        try {
+            consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+        try {
+            consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+        try {
+            consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
+            Assert.fail();
+        } catch (PulsarClientException e) {
+            log.info("Topic reached max consumers limit");
+        }
+        Assert.assertNotNull(consumer1);
+        Assert.assertNotNull(consumer2);
+        Assert.assertNull(consumer3);
+        consumer1.close();
+        consumer2.close();
+
+        Integer getMaxConsumers = admin.topics().getMaxConsumers(persistenceTopic);
+        log.info("MaxConsumers {} get on topic: {}", getMaxConsumers, persistenceTopic);
+        Assert.assertEquals(getMaxConsumers, maxConsumers);
+
+        admin.topics().deletePartitionedTopic(persistenceTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);

Review comment:
       'testTopic' was created in BeforeMethod.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r483598157



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2417,6 +2417,53 @@ protected void internalGetMaxProducers(AsyncResponse asyncResponse) {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
     }
 
+    protected void internalGetMaxConsumers(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<Integer> maxConsumers = getTopicPolicies(topicName)
+                .map(TopicPolicies::getMaxConsumerPerTopic);
+        if (!maxConsumers.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        } else {
+            asyncResponse.resume(maxConsumers.get());
+        }
+    }

Review comment:
       How about making a new PR to resolve them all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r484028978



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -590,4 +591,121 @@ public void testRemovePublishRate() throws Exception {
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
+
+    @Test
+    public void testCheckMaxConsumers() throws Exception {
+        Integer maxProducers = new Integer(-1);
+        log.info("MaxConsumers: {} will set to the topic: {}", maxProducers, testTopic);
+        try {
+            admin.topics().setMaxConsumers(testTopic, maxProducers);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testSetMaxConsumers() throws Exception {
+        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1);
+        log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace);
+        Integer maxConsumers = 2;
+        log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic);
+        admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
+        Thread.sleep(3000);
+
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
+        Consumer consumer1 = null;
+        Consumer consumer2 = null;
+        Consumer consumer3 = null;
+        try {
+            consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+        try {
+            consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+        try {
+            consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
+            Assert.fail();
+        } catch (PulsarClientException e) {
+            log.info("Topic reached max consumers limit");
+        }
+        Assert.assertNotNull(consumer1);
+        Assert.assertNotNull(consumer2);
+        Assert.assertNull(consumer3);
+        consumer1.close();
+        consumer2.close();
+
+        Integer getMaxConsumers = admin.topics().getMaxConsumers(persistenceTopic);
+        log.info("MaxConsumers {} get on topic: {}", getMaxConsumers, persistenceTopic);
+        Assert.assertEquals(getMaxConsumers, maxConsumers);
+
+        admin.topics().deletePartitionedTopic(persistenceTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);

Review comment:
       Okay.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r483568822



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
##########
@@ -590,4 +591,121 @@ public void testRemovePublishRate() throws Exception {
 
         admin.topics().deletePartitionedTopic(testTopic, true);
     }
+
+    @Test
+    public void testCheckMaxConsumers() throws Exception {
+        Integer maxProducers = new Integer(-1);
+        log.info("MaxConsumers: {} will set to the topic: {}", maxProducers, testTopic);
+        try {
+            admin.topics().setMaxConsumers(testTopic, maxProducers);
+            Assert.fail();
+        } catch (PulsarAdminException e) {
+            Assert.assertEquals(e.getStatusCode(), 412);
+        }
+
+        admin.topics().deletePartitionedTopic(testTopic, true);
+    }
+
+    @Test
+    public void testSetMaxConsumers() throws Exception {
+        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 1);
+        log.info("MaxConsumers: {} will set to the namespace: {}", 1, myNamespace);
+        Integer maxConsumers = 2;
+        log.info("MaxConsumers: {} will set to the topic: {}", maxConsumers, persistenceTopic);
+        admin.topics().setMaxConsumers(persistenceTopic, maxConsumers);
+        Thread.sleep(3000);
+
+        admin.topics().createPartitionedTopic(persistenceTopic, 2);
+        Consumer consumer1 = null;
+        Consumer consumer2 = null;
+        Consumer consumer3 = null;
+        try {
+            consumer1 = pulsarClient.newConsumer().subscriptionName("sub1").topic(persistenceTopic).subscribe();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+        try {
+            consumer2 = pulsarClient.newConsumer().subscriptionName("sub2").topic(persistenceTopic).subscribe();
+        } catch (PulsarClientException e) {
+            Assert.fail();
+        }
+        try {
+            consumer3 = pulsarClient.newConsumer().subscriptionName("sub3").topic(persistenceTopic).subscribe();
+            Assert.fail();
+        } catch (PulsarClientException e) {
+            log.info("Topic reached max consumers limit");
+        }
+        Assert.assertNotNull(consumer1);
+        Assert.assertNotNull(consumer2);
+        Assert.assertNull(consumer3);
+        consumer1.close();
+        consumer2.close();
+
+        Integer getMaxConsumers = admin.topics().getMaxConsumers(persistenceTopic);
+        log.info("MaxConsumers {} get on topic: {}", getMaxConsumers, persistenceTopic);
+        Assert.assertEquals(getMaxConsumers, maxConsumers);
+
+        admin.topics().deletePartitionedTopic(persistenceTopic, true);
+        admin.topics().deletePartitionedTopic(testTopic, true);

Review comment:
       remove `admin.topics().deletePartitionedTopic(testTopic, true);`

##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2417,6 +2417,53 @@ protected void internalGetMaxProducers(AsyncResponse asyncResponse) {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
     }
 
+    protected void internalGetMaxConsumers(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<Integer> maxConsumers = getTopicPolicies(topicName)
+                .map(TopicPolicies::getMaxConsumerPerTopic);
+        if (!maxConsumers.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        } else {
+            asyncResponse.resume(maxConsumers.get());
+        }
+    }

Review comment:
       It is not recommended to use `AsyncResponse`, just return the value directly.
   more info to see https://github.com/apache/pulsar/issues/7884




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#issuecomment-688188959


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#issuecomment-686501656


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r484335649



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2417,6 +2417,53 @@ protected void internalGetMaxProducers(AsyncResponse asyncResponse) {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
     }
 
+    protected void internalGetMaxConsumers(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<Integer> maxConsumers = getTopicPolicies(topicName)
+                .map(TopicPolicies::getMaxConsumerPerTopic);
+        if (!maxConsumers.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        } else {
+            asyncResponse.resume(maxConsumers.get());
+        }
+    }

Review comment:
       Done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jianyun8023 commented on a change in pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
jianyun8023 commented on a change in pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#discussion_r484028923



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2417,6 +2417,53 @@ protected void internalGetMaxProducers(AsyncResponse asyncResponse) {
         return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies.get());
     }
 
+    protected void internalGetMaxConsumers(AsyncResponse asyncResponse) {
+        validateAdminAccessForTenant(namespaceName.getTenant());
+        validatePoliciesReadOnlyAccess();
+        if (topicName.isGlobal()) {
+            validateGlobalNamespaceOwnership(namespaceName);
+        }
+        checkTopicLevelPolicyEnable();
+        Optional<Integer> maxConsumers = getTopicPolicies(topicName)
+                .map(TopicPolicies::getMaxConsumerPerTopic);
+        if (!maxConsumers.isPresent()) {
+            asyncResponse.resume(Response.noContent().build());
+        } else {
+            asyncResponse.resume(maxConsumers.get());
+        }
+    }

Review comment:
       I think the new code can comply with this rule.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#issuecomment-686826560


   @codelipenghui @jianyun8023 @jiazhai PTAL. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] zhanghaou commented on pull request #7968: [Issue 7759] Support set Max Consumer on topic level.

Posted by GitBox <gi...@apache.org>.
zhanghaou commented on pull request #7968:
URL: https://github.com/apache/pulsar/pull/7968#issuecomment-688373155


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org