You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "originalHeaed (via GitHub)" <gi...@apache.org> on 2023/02/22 14:37:19 UTC

[GitHub] [rocketmq] originalHeaed opened a new issue, #6163: 在设置 setMaxReconsumeTimes 后,消费端顺序消费,无限重试

originalHeaed opened a new issue, #6163:
URL: https://github.com/apache/rocketmq/issues/6163

   你好!
   版本:RocketMQ  5.0.0
   情况概述:消费端只有一个消费者,设置 MaxReconsumeTimes  值为 1,调用DefaultMQPushConsumer 的顺序消费端口,在消费重试次数达到 1 后,消息被消费端投入延迟队列,随后 broker 重新将消息投入了重试队列,并没有投入死信队列;
   个人源码走查:
   消息在重试超过指定次数后,由消费端投递至延迟队列时,消息的重试次数 == 最大重新重试次数;
   消息在延迟时间到达后,重新投递时调用的时 SendMessageProcessor.sendMessage 方法,内部对消息是否投递死信队列调用了 SendMessageProcessor.handleRetryAndDLQ 方法进行判断(且没有将消息的重试次数进行增加)
   
   `    private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
           RemotingCommand request,
           MessageExt msg, TopicConfig topicConfig, Map<String, String> properties) {
           String newTopic = requestHeader.getTopic();
           if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
               String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());
               SubscriptionGroupConfig subscriptionGroupConfig =
                   this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
               if (null == subscriptionGroupConfig) {
                   response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                   response.setRemark(
                       "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                   return false;
               }
   
               int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
               if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal() && requestHeader.getMaxReconsumeTimes() != null) {
                   maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
               }
               int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes();
               // Using '>' instead of '>=' to compatible with the case that reconsumeTimes here are increased by client.
               if (reconsumeTimes > maxReconsumeTimes) {
                   properties.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "-1");
                   newTopic = MixAll.getDLQTopic(groupName);
                   int queueIdInt = randomQueueId(DLQ_NUMS_PER_GROUP);
                   topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                       DLQ_NUMS_PER_GROUP,
                       PermName.PERM_WRITE | PermName.PERM_READ, 0
                   );
                   msg.setTopic(newTopic);
                   msg.setQueueId(queueIdInt);
                   msg.setDelayTimeLevel(0);
                   if (null == topicConfig) {
                       response.setCode(ResponseCode.SYSTEM_ERROR);
                       response.setRemark("topic[" + newTopic + "] not exist");
                       return false;
                   }
               }
           }
           int sysFlag = requestHeader.getSysFlag();
           if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
               sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
           }
           msg.setSysFlag(sysFlag);
           return true;
       }
   `
   其中进入死信队列的条件是 reconsumeTimes > maxReconsumeTimes,因此投入了重试队列,随后消费端拉取该消息,重复上面的流程(消息的 reconsumeTimes 并没有增加)。
   我想问下这里是需要消费端在消费时自行增加 reconsumeTimes 吗?且只有消息从重试队列中那消息后才需要自行增加


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] ShadowySpirits commented on issue #6163: 在设置 setMaxReconsumeTimes 后,消费端顺序消费,无限重试

Posted by "ShadowySpirits (via GitHub)" <gi...@apache.org>.
ShadowySpirits commented on issue #6163:
URL: https://github.com/apache/rocketmq/issues/6163#issuecomment-1440173879

   Please ask questions in Github Discussion, see [Q&A Guidelines](https://github.com/apache/rocketmq/issues/6153).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on issue #6163: After setMaxReconsumeTimes is set, the order consumer will retry indefinitely

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on issue #6163:
URL: https://github.com/apache/rocketmq/issues/6163#issuecomment-1441079752

   > Hi @originalHeaed, it is indeed a bug and will fixed ASAP.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] odbozhou closed issue #6163: After setMaxReconsumeTimes is set, the order consumer will retry indefinitely

Posted by "odbozhou (via GitHub)" <gi...@apache.org>.
odbozhou closed issue #6163: After setMaxReconsumeTimes is set, the  order consumer will retry indefinitely
URL: https://github.com/apache/rocketmq/issues/6163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] RongtongJin commented on issue #6163: After setMaxReconsumeTimes is set, the order consumer will retry indefinitely

Posted by "RongtongJin (via GitHub)" <gi...@apache.org>.
RongtongJin commented on issue #6163:
URL: https://github.com/apache/rocketmq/issues/6163#issuecomment-1441076696

   Hi @originalHeaed, it is indeed a bug and will fixid ASAP.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org