You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/09/01 09:49:44 UTC

[GitHub] [rocketmq-spring] liuliuzo opened a new issue #293: support Native Listener

liuliuzo opened a new issue #293:
URL: https://github.com/apache/rocketmq-spring/issues/293


   是这样的,我们这边需要支持原生Listener的使用方式自己控制ConsumeConcurrentlyStatus。
   目前做法是优化了下加载并且在Lifecycle set自己的Listener,希望这边又更优雅的方式。
   
   
   public class MyMessageListenerConcurrently implements MessageListenerConcurrently {
        
       private static final Logger log = LoggerFactory.getLogger(MyMessageListenerConcurrently.class);
        
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           for (MessageExt messageExt : msgs) {
               log.debug("received msg: {}", messageExt);
               try {
                   long now = System.currentTimeMillis();
    
                   //do something here
    
                   long costTime = System.currentTimeMillis() - now;
                   log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
               } catch (Exception e) {
                   log.warn("consume message failed. messageExt:{}", messageExt, e);
                   context.setDelayLevelWhenNextConsume(0);
                   return ConsumeConcurrentlyStatus.RECONSUME_LATER;
               }
           }
           return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
       }
   }
   
   @Service
   @RocketMQMessageListener(topic = "topic", consumerGroup = "consumerGroup")
   public class StringConsumer implements RocketMQPushConsumerLifecycleListener {
   
       @Autowired
       private MyMessageListenerConcurrently myMyMessageListenerConcurrently;
   
       @Override
       public void prepareStart(DefaultMQPushConsumer consumer) {
           consumer.setMessageListener(myMyMessageListenerConcurrently);
       }
   }
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-spring] liuliuzo commented on issue #293: support Native Listener

Posted by GitBox <gi...@apache.org>.
liuliuzo commented on issue #293:
URL: https://github.com/apache/rocketmq-spring/issues/293#issuecomment-687569663


   是这样的我感觉和#291的需求差不多,应该是指ConsumeConcurrentlyContext 的delayLevelWhenNextConsume(times); 这个参数。我们这里也有自己控制commit的重试和次数的需求。
   
   DefaultRocketMQListenerContainer.java 中的delayLevelWhenNextConsume和suspendCurrentQueueTimeMillis这两个参数能开放出来配置吗?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-spring] RongtongJin commented on issue #293: support Native Listener

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on issue #293:
URL: https://github.com/apache/rocketmq-spring/issues/293#issuecomment-687001805


   Hi @liuliuzo Rocketmq spring community currently has no such plan to support native Listener. I will close the Issue first, but please feel free to reopen it if you have any other issues.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-spring] RongtongJin closed issue #293: support Native Listener

Posted by GitBox <gi...@apache.org>.
RongtongJin closed issue #293:
URL: https://github.com/apache/rocketmq-spring/issues/293


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-spring] liuliuzo commented on issue #293: Support Native Listener

Posted by GitBox <gi...@apache.org>.
liuliuzo commented on issue #293:
URL: https://github.com/apache/rocketmq-spring/issues/293#issuecomment-735341979


   Support Native Listener 后也能处理一些batch的案例


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq-spring] liuliuzo edited a comment on issue #293: support Native Listener

Posted by GitBox <gi...@apache.org>.
liuliuzo edited a comment on issue #293:
URL: https://github.com/apache/rocketmq-spring/issues/293#issuecomment-687569663


   是这样的我感觉和#291的需求差不多,应该是指ConsumeConcurrentlyContext 的delayLevelWhenNextConsume(times); 这个参数。我们这里也有自己控制commit的重试和次数的需求。
   
   或者DefaultRocketMQListenerContainer.java 中的delayLevelWhenNextConsume和suspendCurrentQueueTimeMillis这两个参数能开放出来配置吗?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org