You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/27 08:06:19 UTC

[GitHub] [pulsar] aloyszhang opened a new pull request #8387: fix pause does not work for new created consumer

aloyszhang opened a new pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387


   
   Fixes #8214 
   
   ### Motivation
   
   fix pause does not work for new created consumer in case of multi topics consumer
   
   ### Modifications
   
   add a flag for `MultiTopicsConsumerImpl`  which indicates wheter `MultiTopicsConsumerImpl` has been paused or not. If the flag is true, we should pause the new added consumers for new topic partition.
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717918827


   @aloyszhang Sorry, I missed it.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717100357


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717203888


   /pulsarbot run-failure-checks
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717655161


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie merged pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
sijie merged pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-718331336


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717698273


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717734680


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#discussion_r513431811



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3037,6 +3037,74 @@ public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testMultiTopicsConsumerImplPause() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topicName = "persistent://my-property/my-ns/partition-topic";
+
+        admin.topics().createPartitionedTopic(topicName, 1);
+
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .create();
+
+        // 1. produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes(UTF_8));
+        }
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .receiverQueueSize(1)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .subscriptionName("test-multi-topic-consumer").subscribe();
+
+        int counter = 0;
+        Message<byte[]> consumedMessage = consumer.receive(3, TimeUnit.SECONDS);
+        while(consumedMessage != null) {
+            assertEquals(consumedMessage.getData(), ("my-message-" + counter++ ).getBytes());
+            consumedMessage = consumer.receive(3, TimeUnit.SECONDS);

Review comment:
       The message count is always 5, it's better to only receive 5 messages to avoid the 3 sec block, we should reduce the test time occupied

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3037,6 +3037,74 @@ public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testMultiTopicsConsumerImplPause() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topicName = "persistent://my-property/my-ns/partition-topic";
+
+        admin.topics().createPartitionedTopic(topicName, 1);
+
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .create();
+
+        // 1. produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes(UTF_8));
+        }
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .receiverQueueSize(1)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .subscriptionName("test-multi-topic-consumer").subscribe();
+
+        int counter = 0;
+        Message<byte[]> consumedMessage = consumer.receive(3, TimeUnit.SECONDS);
+        while(consumedMessage != null) {
+            assertEquals(consumedMessage.getData(), ("my-message-" + counter++ ).getBytes());
+            consumedMessage = consumer.receive(3, TimeUnit.SECONDS);
+        }
+        assertEquals(counter, 5);
+
+        // 2. pause multi-topic consumer
+        consumer.pause();
+
+        // 3. update partition
+        admin.topics().updatePartitionedTopic(topicName, 3);
+
+        // 4. wait for client to update partitions
+        Thread.sleep(5000);

Review comment:
       Is there any way to check the partitioned updated? such as check the internal consumers of the multiple-consumer. Maybe this can reduce the test time occupied




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#discussion_r513648220



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3037,6 +3037,74 @@ public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testMultiTopicsConsumerImplPause() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topicName = "persistent://my-property/my-ns/partition-topic";
+
+        admin.topics().createPartitionedTopic(topicName, 1);
+
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .create();
+
+        // 1. produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes(UTF_8));
+        }
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .receiverQueueSize(1)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .subscriptionName("test-multi-topic-consumer").subscribe();
+
+        int counter = 0;
+        Message<byte[]> consumedMessage = consumer.receive(3, TimeUnit.SECONDS);
+        while(consumedMessage != null) {
+            assertEquals(consumedMessage.getData(), ("my-message-" + counter++ ).getBytes());
+            consumedMessage = consumer.receive(3, TimeUnit.SECONDS);

Review comment:
       will fix it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on a change in pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on a change in pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#discussion_r513648491



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
##########
@@ -3037,6 +3037,74 @@ public void testConsumerSubscriptionInitialize() throws Exception {
         log.info("-- Exiting {} test --", methodName);
     }
 
+    @Test
+    public void testMultiTopicsConsumerImplPause() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        String topicName = "persistent://my-property/my-ns/partition-topic";
+
+        admin.topics().createPartitionedTopic(topicName, 1);
+
+
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .create();
+
+        // 1. produce 5 messages
+        for (int i = 0; i < 5; i++) {
+            final String message = "my-message-" + i;
+            producer.send(message.getBytes(UTF_8));
+        }
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .receiverQueueSize(1)
+                .autoUpdatePartitionsInterval(2 ,TimeUnit.SECONDS)
+                .subscriptionName("test-multi-topic-consumer").subscribe();
+
+        int counter = 0;
+        Message<byte[]> consumedMessage = consumer.receive(3, TimeUnit.SECONDS);
+        while(consumedMessage != null) {
+            assertEquals(consumedMessage.getData(), ("my-message-" + counter++ ).getBytes());
+            consumedMessage = consumer.receive(3, TimeUnit.SECONDS);
+        }
+        assertEquals(counter, 5);
+
+        // 2. pause multi-topic consumer
+        consumer.pause();
+
+        // 3. update partition
+        admin.topics().updatePartitionedTopic(topicName, 3);
+
+        // 4. wait for client to update partitions
+        Thread.sleep(5000);

Review comment:
       will optimize 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717164525


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717894572


   you can see Issue description in #8214 
   Reason is that if we subscribe a partitioned topic, and set it to pause state.  
   Then increase the partition number,  MultiTopicsConsumerImpl will create new ConsumerImpl for new added partitions. 
   But these new created consumers will not be paused.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717203888


   /pulsarbot run-failure-checks
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717164525






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-718331336


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717698273


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717153105


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717153105


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang edited a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang edited a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717894572


   @codelipenghui  you can see Issue description in #8214 
   Reason is that if we subscribe a partitioned topic, and set it to pause state.  
   Then increase the partition number,  MultiTopicsConsumerImpl will create new ConsumerImpl for new added partitions. 
   But these new created consumers will not be paused.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717756714


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang removed a comment on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang removed a comment on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717756714






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717100357


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-717793475


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-719364335


   @wolfstudy I think we should include this in 2.6.2 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on pull request #8387: fix pause does not work for new created consumer

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on pull request #8387:
URL: https://github.com/apache/pulsar/pull/8387#issuecomment-718275540


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org