You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/07/23 09:12:58 UTC

[GitHub] [rocketmq] odbozhou commented on issue #3171: How update queue offset when consumer occur Exception and return ConsumeConcurrentlyStatus.RECONSUME_LATER status

odbozhou commented on issue #3171:
URL: https://github.com/apache/rocketmq/issues/3171#issuecomment-885506742


   > 现象:当消费者消费数据时发生Exception异常且返回`ConsumeConcurrentlyStatus.RECONSUME_LATER`状态时,我们的监控系统就会发现这个队列的“消息积压”指标不断上升而报警。
   > 效果如下图:
   > ![image](https://user-images.githubusercontent.com/2069461/126673455-14817cee-e4e0-496e-a365-1bccab5bb37c.png)
   > (消息积压持续上升,而在某个点突然下降至正常水平,十分费解)
   > 
   > 我阅读了`ConsumeMessageConcurrentlyService`这个实现类种对此类case的处理流程(processConsumeResult 方法),有些疑问还请帮忙解答下哈:
   > 
   > ![image](https://user-images.githubusercontent.com/2069461/126671969-ba1a6609-c4a0-4f6c-a384-054d686107b1.png)
   > 
   > 疑问点:
   > 1)当 `ackIndex = -1` 时,RocketMQ会把出错的 消息 放到重试队列中去,还是把整个queue中的消息放到重试队列?也就是 `consumeRequest.getMsgs()` 放的消息是哪类消息?
   > ![image](https://user-images.githubusercontent.com/2069461/126672773-c6015bf7-4011-4cc5-a7e2-90bf16491a65.png)
   > 
   > 2)关于 offset 的同步处理(上图中最后三行代码实现)
   > 当 `ackIndex = -1` 时,consumeRequest.getMsgs() 是不是已经空了,那么:
   > `long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());`
   > 返回的结果是?
   > 
   > 如果某个异常msg被投递到重试队列了,那么offset的值如何更新?是保持不动,还是更新最新的offset?
   > 1)如果offset保持不动,那么consumer需要等待此msg的再次被投递,但是若此msg处理时一直出错,且超过重试次数和时间(最长15分钟),那么此queue上的消息是否会因此被阻塞?
   > 2)如果offset发生变化,那么此消息再次被投递进queue后,此时队列中的offset就是此msg的offset吧,consumer会不会出现重复消费?(除非broker被设计成有状态的,类似于RabbitMQ那样消费成功就ack掉,从内存中移除)
   > 
   > 如果您有时间,还请帮我指点迷津,这个问题困扰我好久了,非常感谢:)
   
   1、ackIndex=-1,会把消费的一批消息(consumeRequest.getMsgs())都放到重算队列里面重试(retry)队列消费。原理就是把消费失败的消息topic替换根据consumerGroup和topic生成成retry topic,放到一个内部队列当中,broker端定时任务扫描内部重试队列,到时间会重新发到真实的topic当中
   2、consumeRequest.getMsgs()并没有空。long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs())返回最近需要消费的offset,broker也会更新到这个最新的offset
   3、普通消息无论消费成功失败都会更新offset,并且更新到broker,消费失败的消息调用this.sendMessageBack告诉broker,放到retry队列,不会影响后面的消息消费。顺序消息如果消费失败才会不断重试,否则顺序无法保证
   4、mq实现消息语义未at least once,因此可能产生重复,需要消费业务端做幂等


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org