You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "pingww (via GitHub)" <gi...@apache.org> on 2023/05/29 02:12:29 UTC

[GitHub] [rocketmq] pingww commented on a diff in pull request #6577: [ISSUE #6576] Fix pop lmq message

pingww commented on code in PR #6577:
URL: https://github.com/apache/rocketmq/pull/6577#discussion_r1208768081


##########
client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java:
##########
@@ -1063,73 +1063,124 @@ private PopResult processPopResponse(final String brokerName, final RemotingComm
         PopResult popResult = new PopResult(popStatus, msgFoundList);
         PopMessageResponseHeader responseHeader = (PopMessageResponseHeader) response.decodeCommandCustomHeader(PopMessageResponseHeader.class);
         popResult.setRestNum(responseHeader.getRestNum());
+        if (popStatus != PopStatus.FOUND) {
+            return popResult;
+        }
         // it is a pop command if pop time greater than 0, we should set the check point info to extraInfo field
-        if (popStatus == PopStatus.FOUND) {
-            Map<String, Long> startOffsetInfo = null;
-            Map<String, List<Long>> msgOffsetInfo = null;
-            Map<String, Integer> orderCountInfo = null;
+        Map<String, Long> startOffsetInfo = null;
+        Map<String, List<Long>> msgOffsetInfo = null;
+        Map<String, Integer> orderCountInfo = null;
+        if (requestHeader instanceof PopMessageRequestHeader) {
+            popResult.setInvisibleTime(responseHeader.getInvisibleTime());
+            popResult.setPopTime(responseHeader.getPopTime());
+            startOffsetInfo = ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
+            msgOffsetInfo = ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
+            orderCountInfo = ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
+        }
+        Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> sortMap
+            = buildQueueOffsetSortedMap(topic, msgFoundList);
+        Map<String, String> map = new HashMap<>(5);
+        for (MessageExt messageExt : msgFoundList) {
             if (requestHeader instanceof PopMessageRequestHeader) {
-                popResult.setInvisibleTime(responseHeader.getInvisibleTime());
-                popResult.setPopTime(responseHeader.getPopTime());
-                startOffsetInfo = ExtraInfoUtil.parseStartOffsetInfo(responseHeader.getStartOffsetInfo());
-                msgOffsetInfo = ExtraInfoUtil.parseMsgOffsetInfo(responseHeader.getMsgOffsetInfo());
-                orderCountInfo = ExtraInfoUtil.parseOrderCountInfo(responseHeader.getOrderCountInfo());
-            }
-            Map<String/*topicMark@queueId*/, List<Long>/*msg queueOffset*/> sortMap = new HashMap<>(16);
-            for (MessageExt messageExt : msgFoundList) {
-                String key = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
-                if (!sortMap.containsKey(key)) {
-                    sortMap.put(key, new ArrayList<>(4));
-                }
-                sortMap.get(key).add(messageExt.getQueueOffset());
-            }
-            Map<String, String> map = new HashMap<>(5);
-            for (MessageExt messageExt : msgFoundList) {
-                if (requestHeader instanceof PopMessageRequestHeader) {
-                    if (startOffsetInfo == null) {
-                        // we should set the check point info to extraInfo field , if the command is popMsg
-                        // find pop ck offset
-                        String key = messageExt.getTopic() + messageExt.getQueueId();
-                        if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
-                            map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
-                                messageExt.getTopic(), brokerName, messageExt.getQueueId()));
+                if (startOffsetInfo == null) {
+                    // we should set the check point info to extraInfo field , if the command is popMsg
+                    // find pop ck offset
+                    String key = messageExt.getTopic() + messageExt.getQueueId();
+                    if (!map.containsKey(messageExt.getTopic() + messageExt.getQueueId())) {
+                        map.put(key, ExtraInfoUtil.buildExtraInfo(messageExt.getQueueOffset(), responseHeader.getPopTime(), responseHeader.getInvisibleTime(), responseHeader.getReviveQid(),
+                            messageExt.getTopic(), brokerName, messageExt.getQueueId()));
 
-                        }
-                        messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
-                    } else {
-                        if (messageExt.getProperty(MessageConst.PROPERTY_POP_CK) == null) {
-                            String queueIdKey = ExtraInfoUtil.getStartOffsetInfoMapKey(messageExt.getTopic(), messageExt.getQueueId());
-                            String queueOffsetKey = ExtraInfoUtil.getQueueOffsetMapKey(messageExt.getTopic(), messageExt.getQueueId(), messageExt.getQueueOffset());
-                            int index = sortMap.get(queueIdKey).indexOf(messageExt.getQueueOffset());
-                            Long msgQueueOffset = msgOffsetInfo.get(queueIdKey).get(index);
+                    }
+                    messageExt.getProperties().put(MessageConst.PROPERTY_POP_CK, map.get(key) + MessageConst.KEY_SEPARATOR + messageExt.getQueueOffset());
+                } else {
+                    if (messageExt.getProperty(MessageConst.PROPERTY_POP_CK) == null) {
+                        final String queueIdKey;
+                        final String queueOffsetKey;
+                        final int index;
+                        final Long msgQueueOffset;
+                        if (MixAll.isLmq(topic) && messageExt.getReconsumeTimes() == 0 && StringUtils.isNotEmpty(
+                            messageExt.getProperty(MessageConst.PROPERTY_INNER_MULTI_DISPATCH))) {
+                            // process LMQ, LMQ topic has only 1 queue, which queue id is 0

Review Comment:
   Should we define this queue id 0 as a constant ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org