You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/01/20 08:31:54 UTC

[GitHub] [rocketmq] dongeforever commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

dongeforever commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r788502037



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -308,158 +381,431 @@ public void executeOnTimeup() {
                 ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
 
-            long failScheduleOffset = offset;
+            if (cq == null) {
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-            if (cq != null) {
-                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
-                if (bufferCQ != null) {
-                    try {
-                        long nextOffset = offset;
-                        int i = 0;
-                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
-                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                            long offsetPy = bufferCQ.getByteBuffer().getLong();
-                            int sizePy = bufferCQ.getByteBuffer().getInt();
-                            long tagsCode = bufferCQ.getByteBuffer().getLong();
-
-                            if (cq.isExtAddr(tagsCode)) {
-                                if (cq.getExt(tagsCode, cqExtUnit)) {
-                                    tagsCode = cqExtUnit.getTagsCode();
-                                } else {
-                                    //can't find ext content.So re compute tags code.
-                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
-                                        tagsCode, offsetPy, sizePy);
-                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
-                                }
-                            }
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long resetOffset;
+                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else {
+                    resetOffset = this.offset;
+                }
 
-                            long now = System.currentTimeMillis();
-                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
-                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
-                            long countdown = deliverTimestamp - now;
-
-                            if (countdown <= 0) {
-                                MessageExt msgExt =
-                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                        offsetPy, sizePy);
-
-                                if (msgExt != null) {
-                                    try {
-                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
-                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                msgInner.getTopic(), msgInner);
-                                            continue;
-                                        }
-                                        PutMessageResult putMessageResult =
-                                            ScheduleMessageService.this.writeMessageStore
-                                                .putMessage(msgInner);
-
-                                        if (putMessageResult != null
-                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                    putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                            }
-                                            continue;
-                                        } else {
-                                            // XXX: warn and notify me
-                                            log.error(
-                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                msgExt.getTopic(), msgExt.getMsgId());
-                                            ScheduleMessageService.this.deliverExecutorService.schedule(
-                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                nextOffset);
-                                            return;
-                                        }
-                                    } catch (Exception e) {
-                                        /*
-                                         * XXX: warn and notify me
-                                         */
-                                        log.error(
-                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                    }
-                                }
-                            } else {
-                                ScheduleMessageService.this.deliverExecutorService.schedule(
-                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown, TimeUnit.MILLISECONDS);
-                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
-                                return;
-                            }
-                        } // end of for
+                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
-                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+            long nextOffset = this.offset;
+            try {
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                         return;
-                    } finally {
+                    }
 
-                        bufferCQ.release();
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
                     }
-                } // end of if (bufferCQ != null)
-                else {
-
-                    long cqMinOffset = cq.getMinOffsetInQueue();
-                    long cqMaxOffset = cq.getMaxOffsetInQueue();
-                    if (offset < cqMinOffset) {
-                        failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                            msgInner.getTopic(), msgInner);
+                        continue;
+                    }
+
+                    boolean deliverSuc;
+                    if (ScheduleMessageService.this.enableAsyncDeliver) {
+                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                    } else {
+                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                     }
 
-                    if (offset > cqMaxOffset) {
-                        failScheduleOffset = cqMaxOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    if (!deliverSuc) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+                        return;
                     }
                 }
-            } // end of if (cq != null)
 
-            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+            } finally {
+                bufferCQ.release();
+            }
+
+            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
         }
 
-        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setBody(msgExt.getBody());
-            msgInner.setFlag(msgExt.getFlag());
-            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+        public void scheduleNextTimerTask(long offset, long delay) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+        }
 
-            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-            long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-            msgInner.setTagsCode(tagsCodeValue);
-            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+            PutMessageResult result = resultProcess.get();
+            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+            if (sendStatus) {
+                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+            }
+            return sendStatus;
+        }
 
-            msgInner.setSysFlag(msgExt.getSysFlag());
-            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-            msgInner.setBornHost(msgExt.getBornHost());
-            msgInner.setStoreHost(msgExt.getStoreHost());
-            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            //Flow Control
+            int currentPendingNum = processesQueue.size();
+            int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxPendingLimit();
+            if (currentPendingNum > maxPendingLimit) {
+                log.warn("Asynchronous deliver triggers flow control, " +
+                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                return false;
+            }
 
-            msgInner.setWaitStoreMsgOK(false);
-            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            //Blocked
+            PutResultProcess firstProcess = processesQueue.peek();
+            if (firstProcess != null && firstProcess.need2Blocked()) {
+                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+                return false;
+            }
 
-            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+            processesQueue.add(resultProcess);
+            return true;
+        }
 
-            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-            int queueId = Integer.parseInt(queueIdStr);
-            msgInner.setQueueId(queueId);
+        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+            long offsetPy, int sizePy, boolean autoResend) {
+            CompletableFuture<PutMessageResult> future =
+                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+            return new PutResultProcess()
+                .setTopic(msgInner.getTopic())
+                .setDelayLevel(this.delayLevel)
+                .setOffset(offset)
+                .setPhysicOffset(offsetPy)
+                .setPhysicSize(sizePy)
+                .setMsgId(msgId)
+                .setAutoResend(autoResend)
+                .setFuture(future)
+                .thenProcess();
+        }
+    }
+
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
 
-            return msgInner;
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
         }
+
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {

Review comment:
       If the handle processor num is less than level num,  this loop may cause starvation of other queues.
   
   It is better to stop loop when handling numbers of messages continuously.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org