You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@rocketmq.apache.org by 404828407 <40...@qq.com> on 2018/04/02 04:09:52 UTC

投稿 RocketMQ中新消费组上线时有什么风险?

RocketMQ中新消费组上线时有什么风险?
作者:卢松










以前碰到过一个问题,一个新消费组上线时和我们期望的消费行为有偏差,今天专门研究下这个问题,看看是怎么回事。




在RocketMQ中,假如一个新消费组订阅了几个topic,按正常人或者正常业务的期望,新消费组应该从订阅topic的最后一个消息开始消费,但是实际情形不是如此,有时候新消费组会从这些topic的开头开始消费。这就是新消费组上线的风险点。




这对业务来说就有风险,因为消费端需要关心要不要处理以前的消息。或者以前的消息非常多,都还没有删除,消费端要处理多久才能处理完成。再或者,我的消费者处理不了以前的那些老的消息,处理时都出错,这该怎么办?




下面我们来分析下这个问题的来龙去脉,按照1,2,3,4...来说明前因后果及解决方案。




(一)消费端配置。消费端集群消费时,消费端的默认配置是从topic的最后offset开始消费。具体配置代码在DefaultMQPullConsumerImpl的consumeFromWhere()中:









而CONSUME_FROM_LAST_OFFSET的含义是“一个新的订阅组第一次启动从队列的最后位置开始消费”,RocketMQ 3.2.6版本的代码注释中清晰的说明了,但是实际表现却不是如此。









(二),消费端拉取消息的位置计算逻辑。消费端拉取消息是按照topic下的queue来进行主动拉取的,最关键的是这个拉取的位置offset是怎么计算出来的。消费端拉取消息位置的触发点在RebalanceImpl的updateProcessQueueTableInRebalance()中,如图:









对于集群消费者,真正实现计算拉取消息位置的逻辑是在RebalancePushImpl的computePullFromWhere()方法中,这个方法在消费客户端逻辑中是非常重要的,经常出现问题都是这段逻辑导致的。虽然这段代码没有bug,但是要深刻理解这段代码才能避免各种问题的出现。以后的文章中,会经常提到这块的实现逻辑。









(三)computePullFromWhere()的集群实现。computePullFromWhere()是计算不同配置时的拉取offset,我们只关心CONSUME_FROM_LAST_OFFSET时的实现。如下图箭头处,首先去broker端拉取某个queue的消费进度信息:









 offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE) 这段实际上执行的是RemoteBrokerOffsetStore的readOffset(), 如下图红框中的逻辑:









其中long brokerOffset = this.fetchConsumeOffsetFromBroker(mq)的实现如下图,主要是通过MQClientAPIImpl的queryConsumerOffset()去broker上拿某个queue的消费进度,发送的是requestCode是QUERY_CONSUMER_OFFSET的请求:









(四),broker计算consumerOffser的逻辑。broker收到上面消费客户端的请求,是在ClientManageProcessor的queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)中做处理的:









当一个新消费组上线时,会走到订阅组不存在的情况,然后计算当前queue的minOffset,而getMessageStore().checkInDiskByConsumeOffset(

requestHeader.getTopic(),requestHeader.getQueueId(), 0)

一般都是返回false,这个方法含义是检查当前queue的第一条消息是否在磁盘中,而不在内存中。





所以这个api的结果是,当minOffset=0时,返回offset=0,当minOffset>0时返回QUERY_NOT_FOUND结果,消费端拿到这个结果时会抛出MQBrokerException异常。




broker源代码的注释中也写的很清楚了,“

订阅组不存在情况下,如果这个队列的消息最小Offset是0,则表示这个Topic上线时间不长,服务器堆积的数据也不多,那么这个订阅组就从0开始消费。尤其对于Topic队列数动态扩容时,必须要从0开始消费。

”


(五),消费客户端处理broker返回的consumerOffset,查清问题根源。RemoteBrokerOffsetStore的readOffset()中,当minOffset为0时,这个方法返回0;当minOffset大于0时,这个方法返回-1。中间的分析过程省略了,不然贴出来的代码会更多。如下图实现:









上述方法返回的值赋值给lastOffset ,继续往上回到RebalancePushImpl的computePullFromWhere()中,当lastOffset = 0时,会返回0;当lastOffset = -1时,将会返回queue的maxOffset。









这段逻辑对新消费组的意义就是:如果订阅的queue不是从0开始的(minOffset大于0,已经删除过数据了),那么消费端将从maxOffset开始消费,即从最新位置开始消费;如果订阅的queue是从0开始的(minOffset等于0,没有删除过数据),那么消费端将从0开始消费这个queue。




这就是风险点,和我们最初的期望,每次都从最后位置消费消息有偏差!!!




(六)RocketMQ为什么这么设计?设计是否合理?




我们先去查看下Apache RocketMQ 4.2.0中broker端的实现,因为主要实现点还是在broker端。虽然计算消费位置offset的逻辑已经挪到了这个类中:

org.apache.rocketmq.broker.processor.ConsumerManageProcessor,但是实现逻辑是没有任何改变的。




既然Apache版本都没有做任何改动,说明这不是个bug,就是这么设计的。下面我们来分析下为什么一个queue的minOffset为0时,消费端要从0开始消费这个queue上消息,只有这种情况超出了正常的预期。




我们做个假设,假设新消费组上线时,都是从queue的maxOffset开始消费消息。又如果一个topic在一个broker上面有4个queue,新消费组上线后,开始从这四个queue的最后位置消费消息,这时我突然扩容这个topic到8个queue,那么消费端去namesrv上拿到这8个queue的信息需要一个心跳周期,按默认配置是30秒左右。这个心跳周期内,新扩展的queue上完全可能有新消息进来。




当消费端拿到4个新扩展queue的信息后,去broker端拉取消息时,broker还是把这4个扩容queue当作新queue来处理的。按照我们的假设,最终消费端会从这4个新queue的maxOffset开始消费。这就有可能丢失了这4个扩容queue的前面一些消息,有可能会很多消息,而这些消息完全是在新消费组上线后发送出来的!!




有消息漏消费了!这就是为什么新消费组不能都是从maxOffset开始消费的。




这样原因就清楚了,RocketMQ的设计是合理的,导致了重复消费是不可避免的,但是风险是巨大的。这也体现了RocketMQ的一个重大设计原则:宁可重复消费无数消息,也绝不漏掉一条消息。就跟国民党当年喊得 “宁可错杀千人,也绝不放过一个” 一样。




(七),解决方案。我订阅的topic上面可能有几十万条消息没有删除过,难道新上线消费组时,几十万条消息要重新消费吗?




目前我想到的有两种解决方案,1.可以使用mqadmin中的resetOffsetByTime命令来跳过某一个时间点之前的消息。 2.新消费组的消费者启动时,自己去过滤老的消息或者根据时间忽略以前的消息。当然肯定还有其他的解决方案,诸位看官可以思考下。




(八)最终总结下。新消费组上线时还是要处理好历史消息的,无论怎样处理,要提前做好准备。有可能消费到大量历史消息,这是RocketMQ的本身机制导致的,它的配置有更深层的含义。




本篇结束,请听下回分解。

Re: 投稿 RocketMQ中新消费组上线时有什么风险?

Posted by yukon <yu...@apache.org>.
Hi,

非常欢迎撰写RocketMQ相关的技术文章,建议放到Google DOC上并共享出来方便大家进行Comment。

Regards,
yukon


2018-04-02 12:09 GMT+08:00 404828407 <40...@qq.com>:

> RocketMQ中新消费组上线时有什么风险?
> 作者:卢松
>
>
>
>
> 以前碰到过一个问题,一个新消费组上线时和我们期望的消费行为有偏差,今天专门研究下这个问题,看看是怎么回事。
>
>
> 在RocketMQ中,假如一个新消费组订阅了几个topic,按正常人或者正常业务的期望,新消费组应该从订阅topic的最后一个消息开始消费,
> *但是实际情形不是如此,有时候新消费组会从这些topic的开头开始消费。这就是新消费组上线的风险点。*
>
>
> 这对业务来说就有风险,因为消费端需要关心要不要处理以前的消息。或者以前的消息非常多,都还没有删除,消费端要处理多久才能处理完成。再或者,
> 我的消费者处理不了以前的那些老的消息,处理时都出错,这该怎么办?
>
>
> 下面我们来分析下这个问题的来龙去脉,按照1,2,3,4...来说明前因后果及解决方案。
>
>
> *(一)消费端配置。*消费端集群消费时,消费端的默认配置是从topic的最后offset开始消费。
> 具体配置代码在DefaultMQPullConsumerImpl的consumeFromWhere()中:
>
>
>
> 而CONSUME_FROM_LAST_OFFSET的含义是“一个新的订阅组第一次启动从队列的最后位置开始消费”,RocketMQ
> 3.2.6版本的代码注释中清晰的说明了,但是实际表现却不是如此。
>
>
>
> *(二),消费端拉取消息的位置计算逻辑。*消费端拉取消息是按照topic下的queue来进行主动拉取的,
> 最关键的是这个拉取的位置offset是怎么计算出来的。消费端拉取消息位置的触发点在RebalanceImpl的up
> dateProcessQueueTableInRebalance()中,如图:
>
>
>
> 对于集群消费者,真正实现计算拉取消息位置的逻辑是在
> *RebalancePushImpl的computePullFromWhere()方法中,这个方法在消费客户端逻辑中是非常重要的*,
> 经常出现问题都是这段逻辑导致的。虽然这段代码没有bug,但是要深刻理解这段代码才能避免各种问题的出现。以后的文章中,会经常提到这块的实现逻辑。
>
>
>
> *(三)computePullFromWhere()的集群实现*。computePullFromWhere()是计算不同配置时的拉取offset,
> 我们只关心CONSUME_FROM_LAST_OFFSET时的实现。如下图箭头处,首先去broker端拉取某个queue的消费进度信息:
>
>
>
>  offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE)
> 这段实际上执行的是RemoteBrokerOffsetStore的readOffset(), 如下图红框中的逻辑:
>
>
>
> 其中long brokerOffset = this.fetchConsumeOffsetFromBroker(mq)的实现如下图,
> 主要是通过MQClientAPIImpl的queryConsumerOffset()去broker上拿某个queue的消费进度,
> 发送的是requestCode是QUERY_CONSUMER_OFFSET的请求:
>
>
>
> *(四),broker计算consumerOffser的逻辑。*broker收到上面消费客户端的请求,
> 是在ClientManageProcessor的queryConsumerOffset(ChannelHandlerContext ctx,
> RemotingCommand request)中做处理的:
>
>
>
> 当一个新消费组上线时,会走到订阅组不存在的情况,然后计算当前queue的minOffset,而getMessageStore().
> checkInDiskByConsumeOffset(
>
> requestHeader.getTopic(),requestHeader.getQueueId(), 0)
>
> 一般都是返回false,这个方法含义是检查当前queue的第一条消息是否在磁盘中,而不在内存中。
>
>
>
> *所以这个api的结果是,当minOffset=0时,返回offset=0,当minOffset>0时返回QUERY_NOT_FOUND结果,消费端拿到这个结果时会抛出MQBrokerException异常。*
>
>
> broker源代码的注释中也写的很清楚了,“
>
>
> *订阅组不存在情况下,如果这个队列的消息最小Offset是0,则表示这个Topic上线时间不长,服务器堆积的数据也不多,那么这个订阅组就从0开始消费。尤其对于Topic队列数动态扩容时,必须要从0开始消费。*
>
> ”
>
> *(五),消费客户端处理broker返回的consumerOffset,查清问题根源。*RemoteBrokerOffsetStor
> e的readOffset()中,*当minOffset为0时,这个方法返回0;当minOffset大于0时,这个方法返回-1*
> 。中间的分析过程省略了,不然贴出来的代码会更多。如下图实现:
>
>
>
> 上述方法返回的值赋值给*lastOffset ,*继续往上回到
> *RebalancePushImpl的computePullFromWhere()中,*当*lastOffset =
> 0时,会返回0;当lastOffset = -1时,将会返回queue的maxOffset。*
>
>
>
> *这段逻辑对新消费组的意义就是:*
> *如果订阅的queue不是从0开始的(minOffset大于0,已经删除过数据了),那么消费端将从maxOffset开始消费,即从最新位置开始消费;如果订阅的queue是从0开始的(minOffset等于0,没有删除过数据),那么消费端将从0开始消费这个queue。*
>
>
> *这就是风险点,和我们最初的期望,每次都从最后位置消费消息有偏差!!!*
>
>
> *(六)RocketMQ为什么这么设计?设计是否合理?*
>
>
> 我们先去查看下Apache RocketMQ 4.2.0中broker端的实现,因为主要实现点还是在broker端。
> 虽然计算消费位置offset的逻辑已经挪到了这个类中:
>
>
> *org.apache.rocketmq.broker.processor.ConsumerManageProcessor,但是实现逻辑是没有任何改变的。*
>
>
> 既然Apache版本都没有做任何改动,说明这不是个bug,就是这么设计的。下面我们来分析下
> *为什么一个queue的minOffset为0时,消费端要从0开始消费这个queue上消息,只有这种情况超出了正常的预期。*
>
>
> 我们做个假设,假设新消费组上线时,都是从queue的maxOffset开始消费消息。又如果一个topic在一个broker上面有4个queue,
> 新消费组上线后,开始从这四个queue的最后位置消费消息,这时我突然扩容这个topic到8个queue,
> 那么消费端去namesrv上拿到这8个queue的信息需要一个心跳周期,按默认配置是30秒左右。这个心跳周期内,
> 新扩展的queue上完全可能有新消息进来。
>
>
> 当消费端拿到4个新扩展queue的信息后,去broker端拉取消息时,
> *broker还是把这4个扩容queue当作新queue来处理的。按照我们的假设,最终消费端会从这4个新queue的maxOffset开始消费。*
> *这就有可能丢失了这4个扩容queue的前面一些消息,有可能会很多消息,而这些消息完全是在新消费组上线后发送出来的!!*
>
>
> *有消息漏消费了!这就是为什么新消费组不能都是从maxOffset开始消费的。*
>
>
> 这样原因就清楚了,RocketMQ的设计是合理的,导致了重复消费是不可避免的,但是风险是巨大的。*这也体现了RocketMQ的一个重大设计原则:*
> *宁可重复消费无数消息,也绝不漏掉一条消息。*就跟国民党当年喊得 “宁可错杀千人,也绝不放过一个” 一样。
>
>
> *(七),解决方案*。我订阅的topic上面可能有几十万条消息没有删除过,难道新上线消费组时,几十万条消息要重新消费吗?
>
>
> 目前我想到的有两种解决方案,1.可以使用mqadmin中的resetOffsetByTime命令来跳过某一个时间点之前的消息。
> 2.新消费组的消费者启动时,自己去过滤老的消息或者根据时间忽略以前的消息。当然肯定还有其他的解决方案,诸位看官可以思考下。
>
>
> *(八)最终总结下。*新消费组上线时还是要处理好历史消息的,无论怎样处理,要提前做好准备。有可能消费到大量历史消息,
> 这是RocketMQ的本身机制导致的,它的配置有更深层的含义。
>
>
> 本篇结束,请听下回分解。
>