You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/08/09 15:31:28 UTC

[GitHub] [rocketmq] odbozhou commented on a change in pull request #2757: [RIP-19] Pop Consuming (submodule "broker")

odbozhou commented on a change in pull request #2757:
URL: https://github.com/apache/rocketmq/pull/2757#discussion_r685298887



##########
File path: broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
##########
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import com.alibaba.fastjson.JSON;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.util.MsgUtil;
+import org.apache.rocketmq.client.consumer.PullResult;
+import org.apache.rocketmq.client.consumer.PullStatus;
+import org.apache.rocketmq.common.KeyBuilder;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.PopAckConstants;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.utils.DataConverter;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.GetMessageResult;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+
+public class PopReviveService extends ServiceThread {
+    private static final InternalLogger POP_LOGGER = InternalLoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
+
+    private int queueId;
+    private BrokerController brokerController;
+    private String reviveTopic;
+    private static volatile boolean isMaster = false;
+
+    public PopReviveService(int queueId, BrokerController brokerController, String reviveTopic) {
+        super();
+        this.queueId = queueId;
+        this.brokerController = brokerController;
+        this.reviveTopic = reviveTopic;
+    }
+
+    @Override
+    public String getServiceName() {
+        return "PopReviveService_" + this.queueId;
+    }
+
+    private boolean checkMaster() {
+        return brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
+    }
+
+    private boolean checkAndSetMaster() {
+        isMaster = checkMaster();
+        return isMaster;
+    }
+
+    private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) throws Exception {
+        if (!checkAndSetMaster()) {
+            POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+            return;
+        }
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        if (!popCheckPoint.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+            msgInner.setTopic(KeyBuilder.buildPopRetryTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()));
+        } else {
+            msgInner.setTopic(popCheckPoint.getTopic());
+        }
+        msgInner.setBody(messageExt.getBody());
+        msgInner.setQueueId(0);
+        if (messageExt.getTags() != null) {
+            msgInner.setTags(messageExt.getTags());
+        } else {
+            MessageAccessor.setProperties(msgInner, new HashMap<String, String>());
+        }
+        msgInner.setBornTimestamp(messageExt.getBornTimestamp());
+        msgInner.setBornHost(brokerController.getStoreHost());
+        msgInner.setStoreHost(brokerController.getStoreHost());
+        msgInner.setReconsumeTimes(messageExt.getReconsumeTimes() + 1);
+        msgInner.getProperties().putAll(messageExt.getProperties());
+        if (messageExt.getReconsumeTimes() == 0 || msgInner.getProperties().get(MessageConst.PROPERTY_FIRST_POP_TIME) == null) {
+            msgInner.getProperties().put(MessageConst.PROPERTY_FIRST_POP_TIME, String.valueOf(popCheckPoint.getPopTime()));
+        }
+        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+        addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
+        PutMessageResult putMessageResult = brokerController.getMessageStore().putMessage(msgInner);
+        if (brokerController.getBrokerConfig().isEnablePopLog()) {
+            POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
+                    queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
+                    (System.currentTimeMillis() - popCheckPoint.getReviveTime()) / 1000, putMessageResult);
+        }
+        if (putMessageResult.getAppendMessageResult() == null || putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
+            throw new Exception("reviveQueueId=" + queueId + ",revive error ,msg is :" + msgInner);
+        }
+        this.brokerController.getBrokerStatsManager().incBrokerPutNums(1);
+        this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
+        this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
+        if (brokerController.getPopMessageProcessor() != null) {
+            brokerController.getPopMessageProcessor().notifyMessageArriving(
+                    KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
+                    popCheckPoint.getCId(),
+                    -1
+            );
+        }
+    }
+
+    private void initPopRetryOffset(String topic, String consumerGroup) {
+        long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, 0);
+        if (offset < 0) {
+            this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, topic,
+                0, 0);
+        }
+    }
+
+    private void addRetryTopicIfNoExit(String topic, String consumerGroup) {
+        TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic);
+        if (topicConfig != null) {
+            return;
+        }
+        topicConfig = new TopicConfig(topic);
+        topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum);
+        topicConfig.setWriteQueueNums(PopAckConstants.retryQueueNum);
+        topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
+        topicConfig.setPerm(6);
+        topicConfig.setTopicSysFlag(0);
+        brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
+
+        initPopRetryOffset(topic, consumerGroup);
+    }
+
+    private List<MessageExt> getReviveMessage(long offset, int queueId) {
+        PullResult pullResult = getMessage(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, offset, 32);
+        if (pullResult == null) {
+            return null;
+        }
+        if (reachTail(pullResult, offset)) {
+            POP_LOGGER.info("reviveQueueId={}, reach tail,offset {}", queueId, offset);
+        } else if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) {
+            POP_LOGGER.error("reviveQueueId={}, OFFSET_ILLEGAL {}, result is {}", queueId, offset, pullResult);
+            if (!checkAndSetMaster()) {
+                POP_LOGGER.info("slave skip offset correct topic={}, reviveQueueId={}", reviveTopic, queueId);
+                return null;
+            }
+            brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST, PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, pullResult.getNextBeginOffset() - 1);
+        }
+        return pullResult.getMsgFoundList();
+    }
+
+    private boolean reachTail(PullResult pullResult, long offset) {
+        return pullResult.getPullStatus() == PullStatus.NO_NEW_MSG
+                || (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL && offset == pullResult.getMaxOffset());
+    }
+
+    private MessageExt getBizMessage(String topic, long offset, int queueId) {
+        final GetMessageResult getMessageTmpResult = brokerController.getMessageStore().getMessage(PopAckConstants.REVIVE_GROUP, topic, queueId, offset, 1, null);
+        List<MessageExt> list = decodeMsgList(getMessageTmpResult);
+        if (list == null || list.isEmpty()) {
+            POP_LOGGER.warn("can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, getMessageTmpResult);
+            return null;
+        } else {
+            return list.get(0);
+        }
+    }
+
+    public PullResult getMessage(String group, String topic, int queueId, long offset, int nums) {
+        GetMessageResult getMessageResult = brokerController.getMessageStore().getMessage(group, topic, queueId, offset, nums, null);
+
+        if (getMessageResult != null) {
+            PullStatus pullStatus = PullStatus.NO_NEW_MSG;
+            List<MessageExt> foundList = null;
+            switch (getMessageResult.getStatus()) {
+                case FOUND:
+                    pullStatus = PullStatus.FOUND;
+                    foundList = decodeMsgList(getMessageResult);
+                    brokerController.getBrokerStatsManager().incGroupGetNums(group, topic, getMessageResult.getMessageCount());
+                    brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, getMessageResult.getBufferTotalSize());
+                    brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+                    brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
+                            brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1).getStoreTimestamp());
+                    break;
+                case NO_MATCHED_MESSAGE:
+                    pullStatus = PullStatus.NO_MATCHED_MSG;
+                    POP_LOGGER.warn("no matched message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
+                            getMessageResult.getStatus(), topic, group, offset);
+                    break;
+                case NO_MESSAGE_IN_QUEUE:
+                    pullStatus = PullStatus.NO_NEW_MSG;
+                    POP_LOGGER.warn("no new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
+                            getMessageResult.getStatus(), topic, group, offset);
+                    break;
+                case MESSAGE_WAS_REMOVING:
+                case NO_MATCHED_LOGIC_QUEUE:
+                case OFFSET_FOUND_NULL:
+                case OFFSET_OVERFLOW_BADLY:
+                case OFFSET_OVERFLOW_ONE:
+                case OFFSET_TOO_SMALL:
+                    pullStatus = PullStatus.OFFSET_ILLEGAL;
+                    POP_LOGGER.warn("offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
+                            getMessageResult.getStatus(), topic, group, offset);
+                    break;
+                default:
+                    assert false;
+                    break;
+            }
+
+            return new PullResult(pullStatus, getMessageResult.getNextBeginOffset(), getMessageResult.getMinOffset(),
+                    getMessageResult.getMaxOffset(), foundList);
+
+        } else {
+            POP_LOGGER.error("get message from store return null. topic={}, groupId={}, requestOffset={}", topic, group, offset);
+            return null;
+        }
+    }
+
+    private List<MessageExt> decodeMsgList(GetMessageResult getMessageResult) {
+        List<MessageExt> foundList = new ArrayList<>();
+        try {
+            List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
+            if (messageBufferList != null) {
+                for (int i = 0; i < messageBufferList.size(); i++) {
+                    ByteBuffer bb = messageBufferList.get(i);
+                    if (bb == null) {
+                        POP_LOGGER.error("bb is null {}", getMessageResult);
+                        continue;
+                    }
+                    MessageExt msgExt = MessageDecoder.decode(bb);
+                    if (msgExt == null) {
+                        POP_LOGGER.error("decode msgExt is null {}", getMessageResult);
+                        continue;
+                    }
+                    // use CQ offset, not offset in Message
+                    msgExt.setQueueOffset(getMessageResult.getMessageQueueOffset().get(i));
+                    foundList.add(msgExt);
+                }
+            }
+        } finally {
+            getMessageResult.release();
+        }
+
+        return foundList;
+    }
+
+    private void consumeReviveMessage(ConsumeReviveObj consumeReviveObj) {
+        HashMap<String, PopCheckPoint> map = consumeReviveObj.map;
+        long startScanTime = System.currentTimeMillis();
+        long endTime = 0;
+        long oldOffset = brokerController.getConsumerOffsetManager().queryOffset(PopAckConstants.REVIVE_GROUP, reviveTopic, queueId);
+        consumeReviveObj.oldOffset = oldOffset;
+        POP_LOGGER.info("reviveQueueId={}, old offset is {} ", queueId, oldOffset);
+        long offset = oldOffset + 1;
+        long firstRt = 0;
+        // offset self amend
+        while (true) {
+            if (!checkAndSetMaster()) {
+                POP_LOGGER.info("slave skip scan , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+                break;
+            }
+            List<MessageExt> messageExts = getReviveMessage(offset, queueId);
+            if (messageExts == null || messageExts.isEmpty()) {
+                break;
+            }
+            if (System.currentTimeMillis() - startScanTime > brokerController.getBrokerConfig().getReviveScanTime()) {
+                POP_LOGGER.info("reviveQueueId={}, scan timeout  ", queueId);
+                break;
+            }
+            for (MessageExt messageExt : messageExts) {
+                if (PopAckConstants.CK_TAG.equals(messageExt.getTags())) {
+                    String raw = new String(messageExt.getBody(), DataConverter.charset);
+                    if (brokerController.getBrokerConfig().isEnablePopLog()) {
+                        POP_LOGGER.info("reviveQueueId={},find ck, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
+                    }
+                    PopCheckPoint point = JSON.parseObject(raw, PopCheckPoint.class);
+                    if (point.getTopic() == null || point.getCId() == null) {
+                        continue;
+                    }
+                    map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
+                    point.setReviveOffset(messageExt.getQueueOffset());
+                    if (firstRt == 0) {
+                        firstRt = point.getReviveTime();
+                    }
+                } else if (PopAckConstants.ACK_TAG.equals(messageExt.getTags())) {
+                    String raw = new String(messageExt.getBody(), DataConverter.charset);
+                    if (brokerController.getBrokerConfig().isEnablePopLog()) {
+                        POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
+                    }
+                    AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
+                    PopCheckPoint point = map.get(ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime());
+                    if (point == null) {
+                        continue;
+                    }
+                    int indexOfAck = point.indexOfAck(ackMsg.getAckOffset());
+                    if (indexOfAck > -1) {
+                        point.setBitMap(DataConverter.setBit(point.getBitMap(), indexOfAck, true));
+                    } else {
+                        POP_LOGGER.error("invalid ack index, {}, {}", ackMsg, point);
+                    }
+                }
+                long deliverTime = MsgUtil.getMessageDeliverTime(this.brokerController, messageExt);
+                if (deliverTime > endTime) {
+                    endTime = deliverTime;
+                }
+            }
+            offset = offset + messageExts.size();
+        }
+        consumeReviveObj.endTime = endTime;
+    }
+
+    private void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwable {
+        ArrayList<PopCheckPoint> sortList = consumeReviveObj.genSortList();
+        POP_LOGGER.info("reviveQueueId={},ck listSize={}", queueId, sortList.size());
+        if (sortList.size() != 0) {
+            POP_LOGGER.info("reviveQueueId={}, 1st ck, startOffset={}, reviveOffset={} ; last ck, startOffset={}, reviveOffset={}", queueId, sortList.get(0).getStartOffset(),
+                    sortList.get(0).getReviveOffset(), sortList.get(sortList.size() - 1).getStartOffset(), sortList.get(sortList.size() - 1).getReviveOffset());
+        }
+        long newOffset = consumeReviveObj.oldOffset;
+        for (PopCheckPoint popCheckPoint : sortList) {
+            if (!checkAndSetMaster()) {
+                POP_LOGGER.info("slave skip ck process , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+                break;
+            }
+            if (consumeReviveObj.endTime - popCheckPoint.getReviveTime() <= (PopAckConstants.ackTimeInterval + PopAckConstants.SECOND)) {
+                break;
+            }
+
+            // check normal topic, skip ck , if normal topic is not exist
+            String normalTopic = KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId());
+            if (brokerController.getTopicConfigManager().selectTopicConfig(normalTopic) == null) {
+                POP_LOGGER.warn("reviveQueueId={},can not get normal topic {} , then continue ", queueId, popCheckPoint.getTopic());
+                newOffset = popCheckPoint.getReviveOffset();
+                continue;
+            }
+            if (null == brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(popCheckPoint.getCId())) {
+                POP_LOGGER.warn("reviveQueueId={},can not get cid {} , then continue ", queueId, popCheckPoint.getCId());
+                newOffset = popCheckPoint.getReviveOffset();
+                continue;
+            }
+
+            reviveMsgFromCk(popCheckPoint);
+
+            newOffset = popCheckPoint.getReviveOffset();
+        }
+        if (newOffset > consumeReviveObj.oldOffset) {
+            if (!checkAndSetMaster()) {
+                POP_LOGGER.info("slave skip commit, revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+                return;
+            }
+            brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST, PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, newOffset);
+        }
+        consumeReviveObj.newOffset = newOffset;
+    }
+
+    private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
+        for (int j = 0; j < popCheckPoint.getNum(); j++) {

Review comment:
       
   1、The pop mode consumes messages. For each message pulled by the client, the broker will save the CK message and wait for the ACK
   
   2、If the ACK message is not received in time due to client exceptions or timeout due to network reasons, popreviveservice will get the CK message from the receive topic and judge whether there is no ack after timeout. If there is no ack after timeout, the message will be put into the retry topic
   
   ![image](https://user-images.githubusercontent.com/14222167/128729678-c5962d21-a0f5-4a5e-b388-70d0f5778b78.png)
   
   3、When the client pulls a message, it will probability pull a message from the retry topic
   ![image](https://user-images.githubusercontent.com/14222167/128730163-14362248-0f33-472e-b299-a10e8ea61ad0.png)
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org