You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/05/15 06:44:01 UTC

[GitHub] [pulsar] youzipi opened a new issue #6965: Consumer pause do not work

youzipi opened a new issue #6965:
URL: https://github.com/apache/pulsar/issues/6965


   `org.apache.pulsar.client.api.Consumer#pause`
    i use this method to pause consume util other necessery stuff is ready.
   but it don't work as i expected.
   
   ```java
       /**
        * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause
        * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker.
        */
       void pause();
   ```
   I check pulsar's code, the `Consumer.paused` field is only used in `org.apache.pulsar.client.impl.ConsumerImpl#increaseAvailablePermits(org.apache.pulsar.client.impl.ClientCnx, int)`, after this method, the message will be sent to listener.
   I do not found `Stop requesting new messages from the broker`?
   
   do i misunderstand the comment?
   
   #### Expected behavior
   consumer stop receive msg
   
   #### Actual behavior
   
   consumer still receive msg.
   
   #### Steps to reproduce
   here is my pulsarConfig
   
   ```java
   @Slf4j
   @Data
   @Configuration
   @ConfigurationProperties("spring.pulsar")
   public class PulsarConfig {
       @Bean("crawlerInnerParseTopicConsumer")
       public Consumer<String> crawlerInnerParseTopicConsumer(
               PulsarClient pulsarClient,
               CrawlerInnerParseConsumer listener
       ) throws PulsarClientException {
           Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                   .topic(crawlerInnerParseTopic)
                   .consumerName(consumerName)
                   .subscriptionType(SubscriptionType.Shared)
                   .subscriptionName(consumerName + "-crawlerInnerParseTopic")
                   .subscriptionInitialPosition(SubscriptionInitialPosition.Latest)
                   .messageListener(listener)
                   .subscribe();
   
           log.info("[consumer pause],topic={}", crawlerInnerParseTopic);
           consumer.pause();
           return consumer;
       }
   }
   ```
   
   
   #### System configuration
   **Pulsar version**: 2.4.1
   java  pulsar-client-api: 2.4.1
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #6965: Consumer pause do not work

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #6965:
URL: https://github.com/apache/pulsar/issues/6965#issuecomment-630204362


   @youzipi I think you can use the `receive` method of the consumer. This is called to get one message at a time. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] youzipi edited a comment on issue #6965: Consumer pause do not work

Posted by GitBox <gi...@apache.org>.
youzipi edited a comment on issue #6965:
URL: https://github.com/apache/pulsar/issues/6965#issuecomment-629982830


   I checked the UT 
   `org.apache.pulsar.client.api.SimpleProducerConsumerTest#testPauseAndResume`
   
   ```java
   log.info("Giving message listener an opportunity to receive messages while paused");
   Thread.sleep(2000);     // hopefully this is long enough
   assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused");
   ```
   consumer can still receive msg until receiverQueueSize is used up.
   that's where the `might` in method comment come from, it kind of confuse me...
   
   
   how can i stop receive msg immediately?
   consumer has a close() method,but i did not find a `consumer.subscribe()` method.
   is there any way to `block consumer receive msg ` immediately?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on issue #6965: Consumer pause do not work

Posted by GitBox <gi...@apache.org>.
jiazhai commented on issue #6965:
URL: https://github.com/apache/pulsar/issues/6965#issuecomment-629904860


   @youzipi Thanks for reporting this issue. We will take a look into it. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] youzipi edited a comment on issue #6965: Consumer pause do not work

Posted by GitBox <gi...@apache.org>.
youzipi edited a comment on issue #6965:
URL: https://github.com/apache/pulsar/issues/6965#issuecomment-629982830


   I checked the UT 
   `org.apache.pulsar.client.api.SimpleProducerConsumerTest#testPauseAndResume`
   
   ```java
   log.info("Giving message listener an opportunity to receive messages while paused");
   Thread.sleep(2000);     // hopefully this is long enough
   assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused");
   ```
   consumer can still receive msg until receiverQueueSize is used up.
   that's where the `might` in method comment come from, it kind of confuse me...
   
   
   how can i stop receive msg immediately?
   consumer has a close() method,but i did not find a `consumer.subscribe()` method.
   is there any way to `stop consumer receive msg immediately`?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] youzipi commented on issue #6965: Consumer pause do not work

Posted by GitBox <gi...@apache.org>.
youzipi commented on issue #6965:
URL: https://github.com/apache/pulsar/issues/6965#issuecomment-629982830


   I checked the UT 
   `org.apache.pulsar.client.api.SimpleProducerConsumerTest#testPauseAndResume`
   
   ```java
   log.info("Giving message listener an opportunity to receive messages while paused");
   Thread.sleep(2000);     // hopefully this is long enough
   assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused");
   ```
   consumer can still receive msg until receiverQueueSize is used up.
   
   how can i stop receive msg immediately?
   consumer has a close() method,but i did not find a `consumer.subscribe()` method.
   is there any way to `stop consumer receive msg immediately`?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org