You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/01/06 14:51:34 UTC

[GitHub] [rocketmq] RongtongJin commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

RongtongJin commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r779591109



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -66,10 +72,23 @@
     private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
     private MessageStore writeMessageStore;
     private int maxDelayLevel;
+    private boolean enableAsyncDeliver = false;
+    private int maxPendingLimit;
+    private int maxResendNum2Blocked;
+    private ScheduledExecutorService handleExecutorService;
+    private final Map<Integer /* level */, LinkedBlockingQueue<PutResultProcess>> deliverPendingTable =
+        new ConcurrentHashMap<>(32);
 
     public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
         this.defaultMessageStore = defaultMessageStore;
         this.writeMessageStore = defaultMessageStore;
+        if (defaultMessageStore != null) {
+            this.enableAsyncDeliver = defaultMessageStore.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
+            this.maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxPendingLimit();
+            this.maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxResendNum2Blocked();
+        }

Review comment:
       We can not modify parameters by updateBrokerConfig command at runtime if maxPendingLimit and maxResendNum2Blocked are variables of an object.

##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -308,158 +387,408 @@ public void executeOnTimeup() {
                 ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
 
-            long failScheduleOffset = offset;
+            if (cq == null) {
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-            if (cq != null) {
-                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
-                if (bufferCQ != null) {
-                    try {
-                        long nextOffset = offset;
-                        int i = 0;
-                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
-                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                            long offsetPy = bufferCQ.getByteBuffer().getLong();
-                            int sizePy = bufferCQ.getByteBuffer().getInt();
-                            long tagsCode = bufferCQ.getByteBuffer().getLong();
-
-                            if (cq.isExtAddr(tagsCode)) {
-                                if (cq.getExt(tagsCode, cqExtUnit)) {
-                                    tagsCode = cqExtUnit.getTagsCode();
-                                } else {
-                                    //can't find ext content.So re compute tags code.
-                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
-                                        tagsCode, offsetPy, sizePy);
-                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
-                                }
-                            }
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long resetOffset;
+                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else {
+                    resetOffset = this.offset;
+                }
 
-                            long now = System.currentTimeMillis();
-                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
-                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
-                            long countdown = deliverTimestamp - now;
-
-                            if (countdown <= 0) {
-                                MessageExt msgExt =
-                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                        offsetPy, sizePy);
-
-                                if (msgExt != null) {
-                                    try {
-                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
-                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                msgInner.getTopic(), msgInner);
-                                            continue;
-                                        }
-                                        PutMessageResult putMessageResult =
-                                            ScheduleMessageService.this.writeMessageStore
-                                                .putMessage(msgInner);
-
-                                        if (putMessageResult != null
-                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                    putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                            }
-                                            continue;
-                                        } else {
-                                            // XXX: warn and notify me
-                                            log.error(
-                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                msgExt.getTopic(), msgExt.getMsgId());
-                                            ScheduleMessageService.this.deliverExecutorService.schedule(
-                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                nextOffset);
-                                            return;
-                                        }
-                                    } catch (Exception e) {
-                                        /*
-                                         * XXX: warn and notify me
-                                         */
-                                        log.error(
-                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                    }
-                                }
-                            } else {
-                                ScheduleMessageService.this.deliverExecutorService.schedule(
-                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown, TimeUnit.MILLISECONDS);
-                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
-                                return;
-                            }
-                        } // end of for
+                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+                return;
+            }
+
+            long nextOffset = this.offset;
+            try {
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
 
-                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
-                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                         return;
-                    } finally {
+                    }
+
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
+                    }
 
-                        bufferCQ.release();
+                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                            msgInner.getTopic(), msgInner);
+                        continue;
                     }
-                } // end of if (bufferCQ != null)
-                else {
-
-                    long cqMinOffset = cq.getMinOffsetInQueue();
-                    long cqMaxOffset = cq.getMaxOffsetInQueue();
-                    if (offset < cqMinOffset) {
-                        failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+                    boolean deliverSuc;
+                    if (ScheduleMessageService.this.enableAsyncDeliver) {
+                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                    } else {
+                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                     }
 
-                    if (offset > cqMaxOffset) {
-                        failScheduleOffset = cqMaxOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    if (!deliverSuc) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+                        return;
                     }
                 }
-            } // end of if (cq != null)
 
-            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+            } finally {
+                bufferCQ.release();
+            }
+
+            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+        }
+
+        public void scheduleNextTimerTask(long offset, long delay) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+        }
+
+        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+            PutMessageResult result = resultProcess.get();
+            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+            if (sendStatus) {
+                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+            }
+            return sendStatus;
+        }
+
+        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            //Flow Control
+            int currentPendingNum = processesQueue.size();
+            if (currentPendingNum > ScheduleMessageService.this.maxPendingLimit) {
+                log.warn("Asynchronous deliver triggers flow control, " +
+                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                return false;
+            }
+
+            //Blocked
+            PutResultProcess firstProcess = processesQueue.peek();
+            if (firstProcess != null && firstProcess.need2Blocked()) {
+                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+                return false;
+            }
+
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+            processesQueue.add(resultProcess);
+            return true;
+        }
+
+        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+            long offsetPy, int sizePy, boolean autoResend) {
+            CompletableFuture<PutMessageResult> future =
+                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+            return new PutResultProcess()
+                .setTopic(msgInner.getTopic())
+                .setDelayLevel(this.delayLevel)
+                .setOffset(offset)
+                .setPhysicOffset(offsetPy)
+                .setPhysicSize(sizePy)
+                .setMsgId(msgId)
+                .setAutoResend(autoResend)
+                .setFuture(future)
+                .thenProcess();
         }
+    }
 
-        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setBody(msgExt.getBody());
-            msgInner.setFlag(msgExt.getFlag());
-            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
 
-            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-            long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-            msgInner.setTagsCode(tagsCodeValue);
-            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
+        }
 
-            msgInner.setSysFlag(msgExt.getSysFlag());
-            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-            msgInner.setBornHost(msgExt.getBornHost());
-            msgInner.setStoreHost(msgExt.getStoreHost());
-            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {
+                    switch (putResultProcess.getStatus()) {
+                        case SUCCESS:
+                            ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
+                            pendingQueue.remove();
+                            break;
+                        case RUNNING:
+                            break;
+                        case EXCEPTION:
+                            if (!isStarted()) {
+                                log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
+                                return;
+                            }
+                            log.warn("putResultProcess error, info={}", putResultProcess.toString());
+                            putResultProcess.onException();
+                            break;
+                        case SKIP:
+                            log.warn("putResultProcess skip, info={}", putResultProcess.toString());
+                            pendingQueue.remove();
+                            break;
+                    }
+                } catch (Exception e) {
+                    log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);
+                    putResultProcess.onException();
+                }
+            }
 
-            msgInner.setWaitStoreMsgOK(false);
-            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            if (isStarted()) {
+                ScheduleMessageService.this.handleExecutorService
+                    .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
 
-            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+    public class PutResultProcess {
+        private String topic;
+        private long offset;
+        private long physicOffset;
+        private int physicSize;
+        private int delayLevel;
+        private String msgId;
+        private boolean autoResend = false;
+        private CompletableFuture<PutMessageResult> future;
+
+        private volatile int resendCount = 0;
+        private volatile ProcessStatus status = ProcessStatus.RUNNING;
+
+        public PutResultProcess setTopic(String topic) {
+            this.topic = topic;
+            return this;
+        }
 
-            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-            int queueId = Integer.parseInt(queueIdStr);
-            msgInner.setQueueId(queueId);
+        public PutResultProcess setOffset(long offset) {
+            this.offset = offset;
+            return this;
+        }
 
-            return msgInner;
+        public PutResultProcess setPhysicOffset(long physicOffset) {
+            this.physicOffset = physicOffset;
+            return this;
         }
+
+        public PutResultProcess setPhysicSize(int physicSize) {
+            this.physicSize = physicSize;
+            return this;
+        }
+
+        public PutResultProcess setDelayLevel(int delayLevel) {
+            this.delayLevel = delayLevel;
+            return this;
+        }
+
+        public PutResultProcess setMsgId(String msgId) {
+            this.msgId = msgId;
+            return this;
+        }
+
+        public PutResultProcess setAutoResend(boolean autoResend) {
+            this.autoResend = autoResend;
+            return this;
+        }
+
+        public PutResultProcess setFuture(CompletableFuture<PutMessageResult> future) {
+            this.future = future;
+            return this;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public long getNextOffset() {
+            return offset + 1;
+        }
+
+        public long getPhysicOffset() {
+            return physicOffset;
+        }
+
+        public int getPhysicSize() {
+            return physicSize;
+        }
+
+        public Integer getDelayLevel() {
+            return delayLevel;
+        }
+
+        public String getMsgId() {
+            return msgId;
+        }
+
+        public boolean isAutoResend() {
+            return autoResend;
+        }
+
+        public CompletableFuture<PutMessageResult> getFuture() {
+            return future;
+        }
+
+        public int getResendCount() {
+            return resendCount;
+        }
+
+        public PutResultProcess thenProcess() {
+            this.future.thenAccept(result -> {
+                this.handleResult(result);
+            });
+
+            this.future.exceptionally(e -> {
+                log.error("ScheduleMessageService put message exceptionally, info: {}",
+                    PutResultProcess.this.toString(), e);
+
+                onException();
+                return null;
+            });
+            return this;
+        }
+
+        private void handleResult(PutMessageResult result) {
+            if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                onSuccess(result);
+            } else {
+                log.warn("ScheduleMessageService put message failed. info: {}.", result);
+                onException();
+            }
+        }
+
+        public void onSuccess(PutMessageResult result) {
+            this.status = ProcessStatus.SUCCESS;
+            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getMsgNum());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+            }
+        }
+
+        public void onException() {
+            log.warn("ScheduleMessageService onException, info: {}", this.toString());
+            if (this.autoResend) {
+                this.resend();
+            } else {
+                this.status = ProcessStatus.SKIP;
+            }
+        }
+
+        public ProcessStatus getStatus() {
+            return this.status;
+        }
+
+        public PutMessageResult get() {
+            try {
+                return this.future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
+            }
+        }
+
+        private void resend() {
+            log.info("Resend message, info: {}", this.toString());
+
+            // Gradually increase the resend interval.
+            try {
+                Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            try {
+                MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);
+                if (msgExt == null) {
+                    log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());
+                    this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;
+                    return;
+                }
+
+                MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
+                this.handleResult(result);
+                if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                    log.info("Resend message success, info: {}", this.toString());
+                }
+            } catch (Exception e) {
+                this.status = ProcessStatus.EXCEPTION;
+                log.error("Resend message error, info: {}", this.toString(), e);
+            }
+        }
+
+        public boolean need2Blocked() {
+            return this.resendCount > ScheduleMessageService.this.maxResendNum2Blocked;
+        }
+
+        public boolean need2Skip() {
+            return this.resendCount > ScheduleMessageService.this.maxResendNum2Blocked * 2;
+        }
+
+        @Override
+        public String toString() {
+            return "PutResultProcess{" +
+                "topic='" + topic + '\'' +
+                ", offset=" + offset +
+                ", physicOffset=" + physicOffset +
+                ", physicSize=" + physicSize +
+                ", delayLevel=" + delayLevel +
+                ", msgId='" + msgId + '\'' +
+                ", autoResend=" + autoResend +
+                ", resendCount=" + resendCount +
+                ", status=" + status +
+                '}';
+        }
+    }
+
+    public enum ProcessStatus {
+        RUNNING,
+        SUCCESS,
+        EXCEPTION,
+        SKIP,
     }

Review comment:
       It would be better to add comments to explain the meaning of each state and the message processing results. For example, If return ProcessStatus.SKIP, it will discard the message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org