You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@rocketmq.apache.org by 萨尔卡 <10...@qq.com> on 2018/06/12 02:12:27 UTC

回复: CONSUME_FROM_LAST_OFFSET issue

hi zhanhui,
CONSUME_FROM_LAST_OFFSET, 我希望是每个新上线的Consumer Group都是从Topic当前的进度进行消费。
比如,在上线时, topic已经发送了200条消息,则新上线的group从201条开始消费。 而不是从第一条或者其他group进度开始消费.


方便添加下微信哇:




------------------
Have a nice dayFrancis Lee


QQ : 1026203200

Re: CONSUME_FROM_LAST_OFFSET issue

Posted by 卢松 <lu...@gmail.com>.
关于CONSUME_FROM_LAST_OFFSET在新消费组中不起作用的问题,我有个文章详细说明了。

https://mp.weixin.qq.com/s/XjNjE_Hwg9WLPiInSuPMYw
文章中有图片。





以前碰到过一个问题,一个新消费组上线时和我们期望的消费行为有偏差,今天专门研究下这个问题,看看是怎么回事。


在RocketMQ中,假如一个新消费组订阅了几个topic,按正常人或者正常业务的期望,新消费组应该从订阅topic的最后一个消息开始消费,但是实际情形不是如此,有时候新消费组会从这些topic的开头开始消费。这就是新消费组上线的风险点。


这对业务来说就有风险,因为消费端需要关心要不要处理以前的消息。或者以前的消息非常多,都还没有删除,消费端要处理多久才能处理完成。再或者,我的消费者处理不了以前的那些老的消息,处理时都出错,这该怎么办?


下面我们来分析下这个问题的来龙去脉,按照1,2,3,4...来说明前因后果及解决方案。


(一)消费端配置。消费端集群消费时,消费端的默认配置是从topic的最后offset开始消费。具体配置代码在DefaultMQPullConsumerImpl的consumeFromWhere()中:



而CONSUME_FROM_LAST_OFFSET的含义是“一个新的订阅组第一次启动从队列的最后位置开始消费”,RocketMQ
3.2.6版本的代码注释中清晰的说明了,但是实际表现却不是如此。



(二),消费端拉取消息的位置计算逻辑。消费端拉取消息是按照topic下的queue来进行主动拉取的,最关键的是这个拉取的位置offset是怎么计算出来的。消费端拉取消息位置的触发点在RebalanceImpl的updateProcessQueueTableInRebalance()中,如图:



对于集群消费者,真正实现计算拉取消息位置的逻辑是在RebalancePushImpl的computePullFromWhere()方法中,这个方法在消费客户端逻辑中是非常重要的,经常出现问题都是这段逻辑导致的。虽然这段代码没有bug,但是要深刻理解这段代码才能避免各种问题的出现。以后的文章中,会经常提到这块的实现逻辑。



(三)computePullFromWhere()的集群实现。computePullFromWhere()是计算不同配置时的拉取offset,我们只关心CONSUME_FROM_LAST_OFFSET时的实现。如下图箭头处,首先去broker端拉取某个queue的消费进度信息:



 offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)
这段实际上执行的是RemoteBrokerOffsetStore的readOffset(), 如下图红框中的逻辑:



其中long brokerOffset =
this.fetchConsumeOffsetFromBroker(mq)的实现如下图,主要是通过MQClientAPIImpl的queryConsumerOffset()去broker上拿某个queue的消费进度,发送的是requestCode是QUERY_CONSUMER_OFFSET的请求:



(四),broker计算consumerOffser的逻辑。broker收到上面消费客户端的请求,是在ClientManageProcessor的queryConsumerOffset(ChannelHandlerContext
ctx, RemotingCommand request)中做处理的:



当一个新消费组上线时,会走到订阅组不存在的情况,然后计算当前queue的minOffset,而getMessageStore().checkInDiskByConsumeOffset(

requestHeader.getTopic(),requestHeader.getQueueId(), 0)

一般都是返回false,这个方法含义是检查当前queue的第一条消息是否在磁盘中,而不在内存中。


所以这个api的结果是,当minOffset=0时,返回offset=0,当minOffset>0时返回QUERY_NOT_FOUND结果,消费端拿到这个结果时会抛出MQBrokerException异常。


broker源代码的注释中也写的很清楚了,“

订阅组不存在情况下,如果这个队列的消息最小Offset是0,则表示这个Topic上线时间不长,服务器堆积的数据也不多,那么这个订阅组就从0开始消费。尤其对于Topic队列数动态扩容时,必须要从0开始消费。

”

(五),消费客户端处理broker返回的consumerOffset,查清问题根源。RemoteBrokerOffsetStore的readOffset()中,当minOffset为0时,这个方法返回0;当minOffset大于0时,这个方法返回-1。中间的分析过程省略了,不然贴出来的代码会更多。如下图实现:



上述方法返回的值赋值给lastOffset
,继续往上回到RebalancePushImpl的computePullFromWhere()中,当lastOffset =
0时,会返回0;当lastOffset = -1时,将会返回queue的maxOffset。



这段逻辑对新消费组的意义就是:如果订阅的queue不是从0开始的(minOffset大于0,已经删除过数据了),那么消费端将从maxOffset开始消费,即从最新位置开始消费;如果订阅的queue是从0开始的(minOffset等于0,没有删除过数据),那么消费端将从0开始消费这个queue。


这就是风险点,和我们最初的期望,每次都从最后位置消费消息有偏差!!!


(六)RocketMQ为什么这么设计?设计是否合理?


我们先去查看下Apache RocketMQ
4.2.0中broker端的实现,因为主要实现点还是在broker端。虽然计算消费位置offset的逻辑已经挪到了这个类中:

org.apache.rocketmq.broker.processor.ConsumerManageProcessor,但是实现逻辑是没有任何改变的。


既然Apache版本都没有做任何改动,说明这不是个bug,就是这么设计的。下面我们来分析下为什么一个queue的minOffset为0时,消费端要从0开始消费这个queue上消息,只有这种情况超出了正常的预期。


我们做个假设,假设新消费组上线时,都是从queue的maxOffset开始消费消息。又如果一个topic在一个broker上面有4个queue,新消费组上线后,开始从这四个queue的最后位置消费消息,这时我突然扩容这个topic到8个queue,那么消费端去namesrv上拿到这8个queue的信息需要一个心跳周期,按默认配置是30秒左右。这个心跳周期内,新扩展的queue上完全可能有新消息进来。


当消费端拿到4个新扩展queue的信息后,去broker端拉取消息时,broker还是把这4个扩容queue当作新queue来处理的。按照我们的假设,最终消费端会从这4个新queue的maxOffset开始消费。这就有可能丢失了这4个扩容queue的前面一些消息,有可能会很多消息,而这些消息完全是在新消费组上线后发送出来的!!


有消息漏消费了!这就是为什么新消费组不能都是从maxOffset开始消费的。


这样原因就清楚了,RocketMQ的设计是合理的,导致了重复消费是不可避免的,但是风险是巨大的。这也体现了RocketMQ的一个重大设计原则:宁可重复消费无数消息,也绝不漏掉一条消息。就跟国民党当年喊得
“宁可错杀千人,也绝不放过一个” 一样。


(七),解决方案。我订阅的topic上面可能有几十万条消息没有删除过,难道新上线消费组时,几十万条消息要重新消费吗?


目前我想到的有两种解决方案,1.可以使用mqadmin中的resetOffsetByTime命令来跳过某一个时间点之前的消息。
2.新消费组的消费者启动时,自己去过滤老的消息或者根据时间忽略以前的消息。当然肯定还有其他的解决方案,诸位看官可以思考下。


(八)最终总结下。新消费组上线时还是要处理好历史消息的,无论怎样处理,要提前做好准备。有可能消费到大量历史消息,这是RocketMQ的本身机制导致的,它的配置有更深层的含义。