You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/07 14:55:04 UTC

[GitHub] [pulsar] git-enzo opened a new issue #8214: Consumer pause do not work properly in case of multi topics consumers.

git-enzo opened a new issue #8214:
URL: https://github.com/apache/pulsar/issues/8214


   **Describe the bug**
   When we pause a consumer which subscribes to multiple topics, for example by parsing a regular expression, and then the new producer sends a message to the new topic that the consumer discovers/subscribes, then messages from that topic will be "consumable" by the consumer during pause.
   
   **To Reproduce**
   Steps to reproduce the behavior:
   1. Create a ```multi-topic-consumer``` which subscribes to multiple topics by parsing regular expression, for example ```persistent://public/default/foo.*```. This type of consumer periodically checks if new topics are available based on regexp (patternAutoDiscoveryPeriod). Let's say, we set this property to 10 sec. For example:
   
   ```
   Pattern topicsPattern = Pattern.compile("persistent://public/default/foo.*");
   pulsarClient
           .newConsumer()
           .subscriptionName("subscription")
           .subscriptionType(SubscriptionType.Shared)
           .topicsPattern(topicsPattern)
           .patternAutoDiscoveryPeriod(10, TimeUnit.SECONDS)
           .receiverQueueSize(1)
           .subscribe();
   ```
   
   2. Create ```producer1``` which sends messages to topic ```persistent://public/default/foo1```. If ```multi-topic-consumer``` discovers this new topic (max after 10 seconds in our case), then messages will be consumed by consumer. Example producer:
   ```
   Producer<String> producer1 = pulsarClient
           .newProducer()
           .topic("foo1")
           .create();
   ```
   3. Now, if ```producer1``` sends messages, a ```multi-topic-consumer``` can receive them.
   4. Let's **pause** ```multi-topic-consumer``` by invoking ```pause()```. In this case consumer is a PatternMultiTopicsConsumerImpl type. That means underneath there is a list of internal consumers, where each of them consumes single topic (subscribes single topic). When we invoke ```pause()```, then each consumer of this list will be paused separately.
   5. Now, when we send messages from ```producer1```, then ```multi-topic-consumer``` will not consume them. There is an exception, because consumer can still receive messages until receiverQueueSize is used up even if ```pause()``` was invoked, but after that this is impossible. For this case it is irrelevant, we can skip it.
   6. Create ```producer2``` which sends messages to topic ```persistent://public/default/foo2```. If ```multi-topic-consumer``` discovers this new topic ```foo2``` (max after 10 seconds in our case), then messages **will be consumed** by consumer **even if consumer is paused**. Underneath new internal consumer is created, which subscribes ```foo2```, but this particular consumer **is not paused**.
   7. Now we have following state:
   -  ```multi-topic-consumer``` - is paused
   - ```producer1``` sends messages to topic ```foo1``` and ```multi-topic-consumer``` cannot receive them - it is OK
   - ```producer2``` sends messages to topic ```foo2``` and ```multi-topic-consumer``` is able to receive them - it is not expected behavior
   
   
   **Expected behavior**
   When new topic will be discovered/subscribed by multi-topic consumer during consumer is paused, then this consumer should not be able to receive messages from this topic.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on issue #8214: Consumer pause does not work properly in case of multi topics consumers.

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on issue #8214:
URL: https://github.com/apache/pulsar/issues/8214#issuecomment-706004160


   Mabe we should add a flag for `MultiTopicsConsumerImpl` which indicates wheter  `MultiTopicsConsumerImpl` has been paused or not.  If the flag is true, we should pause the new added consumers for new added topics.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #8214: Consumer pause does not work properly in case of multi topics consumers.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #8214:
URL: https://github.com/apache/pulsar/issues/8214#issuecomment-822460843


   Cool @sandrzejczak, looking forward to your PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] aloyszhang commented on issue #8214: Consumer pause does not work properly in case of multi topics consumers.

Posted by GitBox <gi...@apache.org>.
aloyszhang commented on issue #8214:
URL: https://github.com/apache/pulsar/issues/8214#issuecomment-706004160


   Mabe we should add a flag for `MultiTopicsConsumerImpl` which indicates wheter  `MultiTopicsConsumerImpl` has been paused or not.  If the flag is true, we should pause the new added consumers for new added topics.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sandrzejczak edited a comment on issue #8214: Consumer pause does not work properly in case of multi topics consumers.

Posted by GitBox <gi...@apache.org>.
sandrzejczak edited a comment on issue #8214:
URL: https://github.com/apache/pulsar/issues/8214#issuecomment-822313437


   It seems that the bug described in this issue has not been fixed. The description mentions the case where consumers for new topics are discovered/added. `PatternMultiTopicsConsumerImpl` achieves this by invoking `MultiTopicsConsumerImpl#subscribeAsync(String topicName, boolean createTopicIfDoesNotExist)` method. It is still possible to add unpaused consumers to `MultiTopicsConsumerImpl` by using this method.
   
   The https://github.com/apache/pulsar/pull/8387 PR fixes a similar issue where new consumers are added when the number of topic partitions changes for an existing topic. This is a different case than the one described in this issue.
   
   I volunteer to fix the outstanding issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sandrzejczak commented on issue #8214: Consumer pause does not work properly in case of multi topics consumers.

Posted by GitBox <gi...@apache.org>.
sandrzejczak commented on issue #8214:
URL: https://github.com/apache/pulsar/issues/8214#issuecomment-822313437


   It seems that the bug described in this issue has not been fixed. The description mentions the case where consumers for new topics are discovered/added. `PatternMultiTopicsConsumerImpl` achieves this by invoking `MultiTopicsConsumerImpl#subscribeAsync(String topicName, boolean createTopicIfDoesNotExist)` method. It is still possible to add unpaused consumers to `MultiTopicsConsumerImpl` by using this method.
   
   The https://github.com/apache/pulsar/pull/8387 PR fixes a similar issue where new consumers are added when the number of topic partitions changes. This is a different case than the one described in this issue.
   
   I volunteer to fix the outstanding issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie closed issue #8214: Consumer pause does not work properly in case of multi topics consumers.

Posted by GitBox <gi...@apache.org>.
sijie closed issue #8214:
URL: https://github.com/apache/pulsar/issues/8214


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org