You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/11/22 03:02:06 UTC

[GitHub] [rocketmq] lizhimins opened a new issue, #5568: Support long length group and topic for pop mode consumption

lizhimins opened a new issue, #5568:
URL: https://github.com/apache/rocketmq/issues/5568

   支持长度较长的 group 和 topic 进行 pop 模式消费
   问题详细描述:
   pop 消费模式下的重试消息,会存入 %RETRY%Group_Topic 的 Topic 中,底层存储格式最大支持长度为 127 的 Topic
   ![image-20221122102524128](https://user-images.githubusercontent.com/22487634/203210561-ed292fe1-1807-4f61-bc70-60252131d79e.png)
   对于 Topic 的长度上限也是 127,代码见 org.apache.rocketmq.common.topic.TopicValidator#validateTopic
   ![image-20221122103221861](https://user-images.githubusercontent.com/22487634/203210578-ab45089d-9146-4c80-b87d-45b0c831831e.png)
   
   如果 Topic 和 Group 很长,超过 127 的限制之后,将导致 pop 消费非预期工作。为了更好的复现这个问题,我编写了一个 Demo。
   现象为客户端无法消费到 重试 Topic 总长度超长的重试消息。同时服务端有一些错误日志。
   ```Java
   String nameServerAddr = "xxx.xxx.xxx.xxx:9876";
   String topicName = "topic-" + RandomStringUtils.randomAlphabetic(128).toUpperCase();
   String groupName = "group-" + RandomStringUtils.randomAlphabetic(128).toUpperCase();
   System.out.printf("Use topicName: %s%n", topicName);
   System.out.printf("Use groupName: %s%n", groupName);
   
   switchPop(groupName, topicName);
   
   DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName);
   consumer.subscribe(topicName, "*");
   consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
   consumer.setConsumeThreadMin(1);
   consumer.setConsumeThreadMax(1);
   consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
       for (MessageExt message : msgs) {
           System.out.printf("Listener: %s, messageId: %s, times: %d%n", new Date(System.currentTimeMillis()),
               message.getMsgId(), message.getReconsumeTimes());
           if (message.getReconsumeTimes() == 0) {
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           } else {
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
       }
       return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
   });
   consumer.setNamesrvAddr(nameServerAddr);
   consumer.setClientRebalance(false);
   consumer.start();
   System.out.printf("Consumer Started...%n");
   
   DefaultMQProducer producer = new DefaultMQProducer(
   "PID-1", false, null);
   producer.setAccessChannel(AccessChannel.CLOUD);
   producer.setNamesrvAddr(nameServerAddr);
   producer.start();
   System.out.printf("Producer Started...%n");
   
   try {
   	Message msg = new Message(
   		topicName, "*", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
   	SendResult sendResult = producer.send(msg);
   	System.out.printf("Producer send result: %s, messageId: %s%n", topicName, sendResult.getMsgId());
   } catch (Exception e) {
   	e.printStackTrace();
   }
   ```
   
   ![image-20221122103006319](https://user-images.githubusercontent.com/22487634/203210727-f90e5868-7f84-4cbe-b92d-22ba9fa7a5b3.png)
   ![image-20221122103919822](https://user-images.githubusercontent.com/22487634/203210776-fe6f8a89-b434-45a8-8b55-9491e6efb591.png)
   
   解决方案:
   1. 我们计划对存储层进行一个存储格式的简单修改,并将其称为 message version v2。
      通过判断消息的 magic code 来区分是旧版本 v1 (长度 128),还是 v2(长度扩展为 256)来支持长 topic 的存储
   2. 由于低版本客户端只支持解析 v1 格式的 topic 数据,代码见 org.apache.rocketmq.common.message.MessageDecoder#decode
      为了保持对旧版本客户端和多语言旧客户端对于长 topic 的兼容性,不会将这个新的存储格式透传给客户端。
      否则旧版本客户端由于不能解析 magic code 会导致消息静默丢弃。
   3. 由于长 Topic 在绝大多数场景中仅被用作重试 topic(尤其是 pop 消费)使用,我们可以在 broker 上直接将 pop retry topic 还原为origin topic(原本这一步是在客户端完成),这也意味着在服务端生成 pop ck 消息而不是在客户端,代码见
   
      ```Java
      messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK,
      	ExtraInfoUtil.buildExtraInfo(startOffsetInfo.get(queueIdKey), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(), messageExt.getTopic(), brokerName, messageExt.getQueueId(), msgQueueOffset));
      ```
      这个修改将导致 5.0 版本使用 remoting 协议 pop 消费的不兼容。
   
   长期方案:
   1. RIP 47 Data Layout V2 中讨论了关于存储格式的长期演进方案,能够彻底的解决上述问题,详细信息请参考邮件列表和文档
      https://github.com/apache/rocketmq/wiki/RIP-47-Data-Layout-V2


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] francisoliverlee commented on issue #5568: Support long length group and topic for pop mode consumption

Posted by GitBox <gi...@apache.org>.
francisoliverlee commented on issue #5568:
URL: https://github.com/apache/rocketmq/issues/5568#issuecomment-1328207721

   在4.9.X的版本上我统计过, 修改点非常多,如下,目前可以改一版临时用127对于多业务拼接topic名字来说, 确实太短
   ```
   1. private PutMessageStatus checkMessage(MessageExtBrokerInner msg)
   2. private PutMessageStatus checkMessages(MessageExtBatch messageExtBatch)
   3. protected RemotingCommand msgContentCheck(final ChannelHandlerContext ctx,
           final SendMessageRequestHeader requestHeader, RemotingCommand request,
   4. CompletableFuture<RemotingCommand> asyncSendBatchMessage(ChannelHandlerContext ctx, RemotingCommand
   5. PutMessageResult encode(MessageExtBrokerInner msgInner) {
   6. public static MessageExt decode()
   7. ByteBuffer encode(final MessageExtBatch messageExtBatch, PutMessageContext putMessageContext) 
   8. public static Map<String, String> decodeProperties(ByteBuffer byteBuffer)
   9. protected static int calMsgLength(int sysFlag, int bodyLength, int topicLength, int propertiesLength)
   10. EncodeResult serialize(final MessageExtBrokerInner msgInner)
   11. Validators.TOPIC_MAX_LENGTH
   12. TopicValidator.TOPIC_MAX_LENGTH
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins closed issue #5568: Support long length group and topic for pop mode consumption

Posted by "lizhimins (via GitHub)" <gi...@apache.org>.
lizhimins closed issue #5568: Support long length group and topic for pop mode consumption
URL: https://github.com/apache/rocketmq/issues/5568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins commented on issue #5568: Support long length group and topic for pop mode consumption

Posted by GitBox <gi...@apache.org>.
lizhimins commented on issue #5568:
URL: https://github.com/apache/rocketmq/issues/5568#issuecomment-1330069883

   嗯,我已经修改了这些部分并添加了集测,可以帮忙 review 下吗 @francisoliverlee 
   
   I have modified these parts and added integration test. Can you help review this pr..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins commented on issue #5568: Support long length group and topic for pop mode consumption

Posted by GitBox <gi...@apache.org>.
lizhimins commented on issue #5568:
URL: https://github.com/apache/rocketmq/issues/5568#issuecomment-1330070688

   顺便一提,长度限制为 256 而不是更长是受到 cq 文件路径长度的限制


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org