You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/11/03 14:34:58 UTC

[GitHub] [rocketmq] Git-Yang opened a new pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Git-Yang opened a new pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458


   ## What is the purpose of the change
   
   #3449 
   
   ## Brief changelog
   - Configure whether to enable asynchronous delivery
   - Configure the maximum number of pending requests for message delivery
   - Construct the PutResultProcess class to manage each asynchronous delivery request.
   - Construct HandlePutResultTask to process the result of a single level delivery request.
   
   ## Verifying this change
   
   - (todo)Increase test management statistics to verify the accuracy of message delivery
   - (todo)Compare the simultaneous delivery, compare the performance of the two, and give a test report
   
   Follow this checklist to help us incorporate your contribution quickly and easily. Notice, `it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR`.
   
   - [x] Make sure there is a [Github issue](https://github.com/apache/rocketmq/issues) filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue. 
   - [x] Format the pull request title like `[ISSUE #123] Fix UnknownException when host config not exist`. Each commit in the pull request should have a meaningful subject line and body.
   - [x] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [x] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add integration-test in [test module](https://github.com/apache/rocketmq/tree/master/test).
   - [x] Run `mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyle` to make sure basic checks pass. Run `mvn clean install -DskipITs` to make sure unit-test pass. Run `mvn clean test-compile failsafe:integration-test`  to make sure integration-test pass.
   - [ ] If this contribution is large, please file an [Apache Individual Contributor License Agreement](http://www.apache.org/licenses/#clas).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r762662505



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -431,35 +492,414 @@ public void executeOnTimeup() {
             ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                 failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
         }
+    }
+
+    class AsyncDeliverDelayedMessageTimerTask implements Runnable {
+        private final int delayLevel;
+        private final long offset;
+
+        public AsyncDeliverDelayedMessageTimerTask(int delayLevel, long offset) {
+            this.delayLevel = delayLevel;
+            this.offset = offset;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (isStarted()) {
+                    this.executeOnTimeup();
+                }
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, executeOnTimeup exception", e);
+                this.toNextTask(this.offset);
+            }
+        }
+
+        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
+            long result = deliverTimestamp;
+            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
+            if (deliverTimestamp > maxTimestamp) {
+                result = now;
+            }
+            return result;
+        }
+
+        public void executeOnTimeup() {
+            ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
+                    TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
+
+            if (cq == null) {
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long cqMinOffset = cq.getMinOffsetInQueue();
+                if (this.offset < cqMinOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                            this.offset, cqMinOffset, cq.getQueueId());
+                    this.toNextTask(cqMinOffset);
+                    return;
+                }
+
+                long cqMaxOffset = cq.getMaxOffsetInQueue();
+                if (this.offset > cqMaxOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                            this.offset, cqMaxOffset, cq.getQueueId());
+                    this.toNextTask(cqMaxOffset);
+                    return;
+                }
+
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            try {
+                long currentOffset;
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                    tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    currentOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.toNextTask(currentOffset);
+                        return;
+                    }
+
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
+                    }
+
+                    try {
+                        MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                                    msgInner.getTopic(), msgInner);
+                            continue;
+                        }
+
+                        // Asynchronous send flow control.
+                        int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore
+                                .getMessageStoreConfig().getScheduleAsyncDeliverMaxPendingLimit();
+                        int currentPendingNum = this.getDeliverPendingQueue().size();
+                        if (currentPendingNum > maxPendingLimit) {
+                            log.warn("Asynchronous sending triggers flow control, " +
+                                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        // Handling abnormal blocking.
+                        PutResultProcess firstProcess = this.getDeliverPendingQueue().peek();
+                        if (firstProcess != null && firstProcess.need2Wait()) {
+                            log.warn("Asynchronous sending block. info={}", firstProcess.toString());
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        CompletableFuture<PutMessageResult> future =
+                                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+                        PutResultProcess putResultProcess = new PutResultProcess()
+                                .setTopic(msgInner.getTopic())
+                                .setDelayLevel(this.delayLevel)
+                                .setOffset(currentOffset)
+                                .setPhysicOffset(offsetPy)
+                                .setPhysicSize(sizePy)
+                                .setMsgId(msgExt.getMsgId())
+                                .setAutoResend(true)
+                                .setFuture(future)
+                                .thenProcess();
+                        this.getDeliverPendingQueue().add(putResultProcess);
+                        continue;
+                    } catch (Exception e) {
+                        log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, currentOffset, offsetPy, sizePy, e);
+                    }
+                }
+
+                long nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+                this.toNextTask(nextOffset);
+                return;
+            } finally {
+                bufferCQ.release();
+            }
+        }
+
+        public void toNextTask(long offset) {

Review comment:
       Agree with your suggestion, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r788578573



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -308,158 +381,431 @@ public void executeOnTimeup() {
                 ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
 
-            long failScheduleOffset = offset;
+            if (cq == null) {
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-            if (cq != null) {
-                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
-                if (bufferCQ != null) {
-                    try {
-                        long nextOffset = offset;
-                        int i = 0;
-                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
-                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                            long offsetPy = bufferCQ.getByteBuffer().getLong();
-                            int sizePy = bufferCQ.getByteBuffer().getInt();
-                            long tagsCode = bufferCQ.getByteBuffer().getLong();
-
-                            if (cq.isExtAddr(tagsCode)) {
-                                if (cq.getExt(tagsCode, cqExtUnit)) {
-                                    tagsCode = cqExtUnit.getTagsCode();
-                                } else {
-                                    //can't find ext content.So re compute tags code.
-                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
-                                        tagsCode, offsetPy, sizePy);
-                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
-                                }
-                            }
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long resetOffset;
+                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else {
+                    resetOffset = this.offset;
+                }
 
-                            long now = System.currentTimeMillis();
-                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
-                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
-                            long countdown = deliverTimestamp - now;
-
-                            if (countdown <= 0) {
-                                MessageExt msgExt =
-                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                        offsetPy, sizePy);
-
-                                if (msgExt != null) {
-                                    try {
-                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
-                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                msgInner.getTopic(), msgInner);
-                                            continue;
-                                        }
-                                        PutMessageResult putMessageResult =
-                                            ScheduleMessageService.this.writeMessageStore
-                                                .putMessage(msgInner);
-
-                                        if (putMessageResult != null
-                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                    putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                            }
-                                            continue;
-                                        } else {
-                                            // XXX: warn and notify me
-                                            log.error(
-                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                msgExt.getTopic(), msgExt.getMsgId());
-                                            ScheduleMessageService.this.deliverExecutorService.schedule(
-                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                nextOffset);
-                                            return;
-                                        }
-                                    } catch (Exception e) {
-                                        /*
-                                         * XXX: warn and notify me
-                                         */
-                                        log.error(
-                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                    }
-                                }
-                            } else {
-                                ScheduleMessageService.this.deliverExecutorService.schedule(
-                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown, TimeUnit.MILLISECONDS);
-                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
-                                return;
-                            }
-                        } // end of for
+                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
-                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+            long nextOffset = this.offset;
+            try {
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                         return;
-                    } finally {
+                    }
 
-                        bufferCQ.release();
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
                     }
-                } // end of if (bufferCQ != null)
-                else {
-
-                    long cqMinOffset = cq.getMinOffsetInQueue();
-                    long cqMaxOffset = cq.getMaxOffsetInQueue();
-                    if (offset < cqMinOffset) {
-                        failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                            msgInner.getTopic(), msgInner);
+                        continue;
+                    }
+
+                    boolean deliverSuc;
+                    if (ScheduleMessageService.this.enableAsyncDeliver) {
+                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                    } else {
+                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                     }
 
-                    if (offset > cqMaxOffset) {
-                        failScheduleOffset = cqMaxOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    if (!deliverSuc) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+                        return;
                     }
                 }
-            } // end of if (cq != null)
 
-            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+            } finally {
+                bufferCQ.release();
+            }
+
+            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
         }
 
-        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setBody(msgExt.getBody());
-            msgInner.setFlag(msgExt.getFlag());
-            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+        public void scheduleNextTimerTask(long offset, long delay) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+        }
 
-            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-            long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-            msgInner.setTagsCode(tagsCodeValue);
-            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+            PutMessageResult result = resultProcess.get();
+            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+            if (sendStatus) {
+                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+            }
+            return sendStatus;
+        }
 
-            msgInner.setSysFlag(msgExt.getSysFlag());
-            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-            msgInner.setBornHost(msgExt.getBornHost());
-            msgInner.setStoreHost(msgExt.getStoreHost());
-            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            //Flow Control
+            int currentPendingNum = processesQueue.size();
+            int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxPendingLimit();
+            if (currentPendingNum > maxPendingLimit) {
+                log.warn("Asynchronous deliver triggers flow control, " +
+                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                return false;
+            }
 
-            msgInner.setWaitStoreMsgOK(false);
-            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            //Blocked
+            PutResultProcess firstProcess = processesQueue.peek();
+            if (firstProcess != null && firstProcess.need2Blocked()) {
+                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+                return false;
+            }
 
-            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+            processesQueue.add(resultProcess);
+            return true;
+        }
 
-            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-            int queueId = Integer.parseInt(queueIdStr);
-            msgInner.setQueueId(queueId);
+        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+            long offsetPy, int sizePy, boolean autoResend) {
+            CompletableFuture<PutMessageResult> future =
+                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+            return new PutResultProcess()
+                .setTopic(msgInner.getTopic())
+                .setDelayLevel(this.delayLevel)
+                .setOffset(offset)
+                .setPhysicOffset(offsetPy)
+                .setPhysicSize(sizePy)
+                .setMsgId(msgId)
+                .setAutoResend(autoResend)
+                .setFuture(future)
+                .thenProcess();
+        }
+    }
+
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
 
-            return msgInner;
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
         }
+
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {

Review comment:
       I will use delaylevel as deliverThreadPoolNums to solve this problem.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-965998740


   ### Test Results
   - 1000w messages are sent through the script (level=4), and each message generates a unique self-increasing Key (0~9999999)
   - The message delivery process restarts the broker service.
   - The test result:
      -  no data loss(totalCount=9999999, because the key=19 query failed.)
      -  no data duplication.
      - delivery tps 2.3w+
   ![image](https://user-images.githubusercontent.com/30995057/141241356-10bf4f00-0250-45e6-9223-bf69c1122cac.png)
   ![image](https://user-images.githubusercontent.com/30995057/141241460-01500731-d14a-4904-8c78-e7d9e200f725.png)
   ![image](https://user-images.githubusercontent.com/30995057/141241633-f3dfedc9-8934-421c-9b1e-d41234cdffb8.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r779983206



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -308,158 +387,408 @@ public void executeOnTimeup() {
                 ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
 
-            long failScheduleOffset = offset;
+            if (cq == null) {
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-            if (cq != null) {
-                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
-                if (bufferCQ != null) {
-                    try {
-                        long nextOffset = offset;
-                        int i = 0;
-                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
-                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                            long offsetPy = bufferCQ.getByteBuffer().getLong();
-                            int sizePy = bufferCQ.getByteBuffer().getInt();
-                            long tagsCode = bufferCQ.getByteBuffer().getLong();
-
-                            if (cq.isExtAddr(tagsCode)) {
-                                if (cq.getExt(tagsCode, cqExtUnit)) {
-                                    tagsCode = cqExtUnit.getTagsCode();
-                                } else {
-                                    //can't find ext content.So re compute tags code.
-                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
-                                        tagsCode, offsetPy, sizePy);
-                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
-                                }
-                            }
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long resetOffset;
+                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else {
+                    resetOffset = this.offset;
+                }
 
-                            long now = System.currentTimeMillis();
-                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
-                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
-                            long countdown = deliverTimestamp - now;
-
-                            if (countdown <= 0) {
-                                MessageExt msgExt =
-                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                        offsetPy, sizePy);
-
-                                if (msgExt != null) {
-                                    try {
-                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
-                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                msgInner.getTopic(), msgInner);
-                                            continue;
-                                        }
-                                        PutMessageResult putMessageResult =
-                                            ScheduleMessageService.this.writeMessageStore
-                                                .putMessage(msgInner);
-
-                                        if (putMessageResult != null
-                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                    putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                            }
-                                            continue;
-                                        } else {
-                                            // XXX: warn and notify me
-                                            log.error(
-                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                msgExt.getTopic(), msgExt.getMsgId());
-                                            ScheduleMessageService.this.deliverExecutorService.schedule(
-                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                nextOffset);
-                                            return;
-                                        }
-                                    } catch (Exception e) {
-                                        /*
-                                         * XXX: warn and notify me
-                                         */
-                                        log.error(
-                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                    }
-                                }
-                            } else {
-                                ScheduleMessageService.this.deliverExecutorService.schedule(
-                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown, TimeUnit.MILLISECONDS);
-                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
-                                return;
-                            }
-                        } // end of for
+                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+                return;
+            }
+
+            long nextOffset = this.offset;
+            try {
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
 
-                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
-                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                         return;
-                    } finally {
+                    }
+
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
+                    }
 
-                        bufferCQ.release();
+                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                            msgInner.getTopic(), msgInner);
+                        continue;
                     }
-                } // end of if (bufferCQ != null)
-                else {
-
-                    long cqMinOffset = cq.getMinOffsetInQueue();
-                    long cqMaxOffset = cq.getMaxOffsetInQueue();
-                    if (offset < cqMinOffset) {
-                        failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+                    boolean deliverSuc;
+                    if (ScheduleMessageService.this.enableAsyncDeliver) {
+                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                    } else {
+                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                     }
 
-                    if (offset > cqMaxOffset) {
-                        failScheduleOffset = cqMaxOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    if (!deliverSuc) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+                        return;
                     }
                 }
-            } // end of if (cq != null)
 
-            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+            } finally {
+                bufferCQ.release();
+            }
+
+            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+        }
+
+        public void scheduleNextTimerTask(long offset, long delay) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+        }
+
+        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+            PutMessageResult result = resultProcess.get();
+            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+            if (sendStatus) {
+                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+            }
+            return sendStatus;
+        }
+
+        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            //Flow Control
+            int currentPendingNum = processesQueue.size();
+            if (currentPendingNum > ScheduleMessageService.this.maxPendingLimit) {
+                log.warn("Asynchronous deliver triggers flow control, " +
+                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                return false;
+            }
+
+            //Blocked
+            PutResultProcess firstProcess = processesQueue.peek();
+            if (firstProcess != null && firstProcess.need2Blocked()) {
+                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+                return false;
+            }
+
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+            processesQueue.add(resultProcess);
+            return true;
+        }
+
+        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+            long offsetPy, int sizePy, boolean autoResend) {
+            CompletableFuture<PutMessageResult> future =
+                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+            return new PutResultProcess()
+                .setTopic(msgInner.getTopic())
+                .setDelayLevel(this.delayLevel)
+                .setOffset(offset)
+                .setPhysicOffset(offsetPy)
+                .setPhysicSize(sizePy)
+                .setMsgId(msgId)
+                .setAutoResend(autoResend)
+                .setFuture(future)
+                .thenProcess();
         }
+    }
 
-        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setBody(msgExt.getBody());
-            msgInner.setFlag(msgExt.getFlag());
-            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
 
-            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-            long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-            msgInner.setTagsCode(tagsCodeValue);
-            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
+        }
 
-            msgInner.setSysFlag(msgExt.getSysFlag());
-            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-            msgInner.setBornHost(msgExt.getBornHost());
-            msgInner.setStoreHost(msgExt.getStoreHost());
-            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {
+                    switch (putResultProcess.getStatus()) {
+                        case SUCCESS:
+                            ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
+                            pendingQueue.remove();
+                            break;
+                        case RUNNING:
+                            break;
+                        case EXCEPTION:
+                            if (!isStarted()) {
+                                log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
+                                return;
+                            }
+                            log.warn("putResultProcess error, info={}", putResultProcess.toString());
+                            putResultProcess.onException();
+                            break;
+                        case SKIP:
+                            log.warn("putResultProcess skip, info={}", putResultProcess.toString());
+                            pendingQueue.remove();
+                            break;
+                    }
+                } catch (Exception e) {
+                    log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);
+                    putResultProcess.onException();
+                }
+            }
 
-            msgInner.setWaitStoreMsgOK(false);
-            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            if (isStarted()) {
+                ScheduleMessageService.this.handleExecutorService
+                    .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
 
-            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+    public class PutResultProcess {
+        private String topic;
+        private long offset;
+        private long physicOffset;
+        private int physicSize;
+        private int delayLevel;
+        private String msgId;
+        private boolean autoResend = false;
+        private CompletableFuture<PutMessageResult> future;
+
+        private volatile int resendCount = 0;
+        private volatile ProcessStatus status = ProcessStatus.RUNNING;
+
+        public PutResultProcess setTopic(String topic) {
+            this.topic = topic;
+            return this;
+        }
 
-            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-            int queueId = Integer.parseInt(queueIdStr);
-            msgInner.setQueueId(queueId);
+        public PutResultProcess setOffset(long offset) {
+            this.offset = offset;
+            return this;
+        }
 
-            return msgInner;
+        public PutResultProcess setPhysicOffset(long physicOffset) {
+            this.physicOffset = physicOffset;
+            return this;
         }
+
+        public PutResultProcess setPhysicSize(int physicSize) {
+            this.physicSize = physicSize;
+            return this;
+        }
+
+        public PutResultProcess setDelayLevel(int delayLevel) {
+            this.delayLevel = delayLevel;
+            return this;
+        }
+
+        public PutResultProcess setMsgId(String msgId) {
+            this.msgId = msgId;
+            return this;
+        }
+
+        public PutResultProcess setAutoResend(boolean autoResend) {
+            this.autoResend = autoResend;
+            return this;
+        }
+
+        public PutResultProcess setFuture(CompletableFuture<PutMessageResult> future) {
+            this.future = future;
+            return this;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public long getNextOffset() {
+            return offset + 1;
+        }
+
+        public long getPhysicOffset() {
+            return physicOffset;
+        }
+
+        public int getPhysicSize() {
+            return physicSize;
+        }
+
+        public Integer getDelayLevel() {
+            return delayLevel;
+        }
+
+        public String getMsgId() {
+            return msgId;
+        }
+
+        public boolean isAutoResend() {
+            return autoResend;
+        }
+
+        public CompletableFuture<PutMessageResult> getFuture() {
+            return future;
+        }
+
+        public int getResendCount() {
+            return resendCount;
+        }
+
+        public PutResultProcess thenProcess() {
+            this.future.thenAccept(result -> {
+                this.handleResult(result);
+            });
+
+            this.future.exceptionally(e -> {
+                log.error("ScheduleMessageService put message exceptionally, info: {}",
+                    PutResultProcess.this.toString(), e);
+
+                onException();
+                return null;
+            });
+            return this;
+        }
+
+        private void handleResult(PutMessageResult result) {
+            if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                onSuccess(result);
+            } else {
+                log.warn("ScheduleMessageService put message failed. info: {}.", result);
+                onException();
+            }
+        }
+
+        public void onSuccess(PutMessageResult result) {
+            this.status = ProcessStatus.SUCCESS;
+            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getMsgNum());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+            }
+        }
+
+        public void onException() {
+            log.warn("ScheduleMessageService onException, info: {}", this.toString());
+            if (this.autoResend) {
+                this.resend();
+            } else {
+                this.status = ProcessStatus.SKIP;
+            }
+        }
+
+        public ProcessStatus getStatus() {
+            return this.status;
+        }
+
+        public PutMessageResult get() {
+            try {
+                return this.future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
+            }
+        }
+
+        private void resend() {
+            log.info("Resend message, info: {}", this.toString());
+
+            // Gradually increase the resend interval.
+            try {
+                Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            try {
+                MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);
+                if (msgExt == null) {
+                    log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());
+                    this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;
+                    return;
+                }
+
+                MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
+                this.handleResult(result);
+                if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                    log.info("Resend message success, info: {}", this.toString());
+                }
+            } catch (Exception e) {
+                this.status = ProcessStatus.EXCEPTION;
+                log.error("Resend message error, info: {}", this.toString(), e);
+            }
+        }
+
+        public boolean need2Blocked() {
+            return this.resendCount > ScheduleMessageService.this.maxResendNum2Blocked;
+        }
+
+        public boolean need2Skip() {
+            return this.resendCount > ScheduleMessageService.this.maxResendNum2Blocked * 2;
+        }
+
+        @Override
+        public String toString() {
+            return "PutResultProcess{" +
+                "topic='" + topic + '\'' +
+                ", offset=" + offset +
+                ", physicOffset=" + physicOffset +
+                ", physicSize=" + physicSize +
+                ", delayLevel=" + delayLevel +
+                ", msgId='" + msgId + '\'' +
+                ", autoResend=" + autoResend +
+                ", resendCount=" + resendCount +
+                ", status=" + status +
+                '}';
+        }
+    }
+
+    public enum ProcessStatus {
+        RUNNING,
+        SUCCESS,
+        EXCEPTION,
+        SKIP,
     }

Review comment:
       Ok, I will add a comment description. 
   In some extreme cases, in order to avoid blocking, message delivery will be skipped. For example: 
   After the message delivery fails, a retry will be performed at this time. If the message does not exist (the message expires), the delivery will be skipped to prevent the entire delay level from being blocked and unavailable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] HScarb commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
HScarb commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r761696476



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -431,35 +492,414 @@ public void executeOnTimeup() {
             ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                 failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
         }
+    }
+
+    class AsyncDeliverDelayedMessageTimerTask implements Runnable {
+        private final int delayLevel;
+        private final long offset;
+
+        public AsyncDeliverDelayedMessageTimerTask(int delayLevel, long offset) {
+            this.delayLevel = delayLevel;
+            this.offset = offset;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (isStarted()) {
+                    this.executeOnTimeup();
+                }
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, executeOnTimeup exception", e);
+                this.toNextTask(this.offset);
+            }
+        }
+
+        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
+            long result = deliverTimestamp;
+            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
+            if (deliverTimestamp > maxTimestamp) {
+                result = now;
+            }
+            return result;
+        }
+
+        public void executeOnTimeup() {
+            ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
+                    TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
+
+            if (cq == null) {
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long cqMinOffset = cq.getMinOffsetInQueue();
+                if (this.offset < cqMinOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                            this.offset, cqMinOffset, cq.getQueueId());
+                    this.toNextTask(cqMinOffset);
+                    return;
+                }
+
+                long cqMaxOffset = cq.getMaxOffsetInQueue();
+                if (this.offset > cqMaxOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                            this.offset, cqMaxOffset, cq.getQueueId());
+                    this.toNextTask(cqMaxOffset);
+                    return;
+                }
+
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            try {
+                long currentOffset;
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                    tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    currentOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.toNextTask(currentOffset);
+                        return;
+                    }
+
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
+                    }
+
+                    try {
+                        MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                                    msgInner.getTopic(), msgInner);
+                            continue;
+                        }
+
+                        // Asynchronous send flow control.
+                        int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore
+                                .getMessageStoreConfig().getScheduleAsyncDeliverMaxPendingLimit();
+                        int currentPendingNum = this.getDeliverPendingQueue().size();
+                        if (currentPendingNum > maxPendingLimit) {
+                            log.warn("Asynchronous sending triggers flow control, " +
+                                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        // Handling abnormal blocking.
+                        PutResultProcess firstProcess = this.getDeliverPendingQueue().peek();
+                        if (firstProcess != null && firstProcess.need2Wait()) {
+                            log.warn("Asynchronous sending block. info={}", firstProcess.toString());
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        CompletableFuture<PutMessageResult> future =
+                                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+                        PutResultProcess putResultProcess = new PutResultProcess()
+                                .setTopic(msgInner.getTopic())
+                                .setDelayLevel(this.delayLevel)
+                                .setOffset(currentOffset)
+                                .setPhysicOffset(offsetPy)
+                                .setPhysicSize(sizePy)
+                                .setMsgId(msgExt.getMsgId())
+                                .setAutoResend(true)
+                                .setFuture(future)
+                                .thenProcess();
+                        this.getDeliverPendingQueue().add(putResultProcess);
+                        continue;
+                    } catch (Exception e) {
+                        log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, currentOffset, offsetPy, sizePy, e);
+                    }
+                }
+
+                long nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+                this.toNextTask(nextOffset);
+                return;
+            } finally {
+                bufferCQ.release();
+            }
+        }
+
+        public void toNextTask(long offset) {

Review comment:
       method name `toNextTask` is ambiguous, `scheduleNextTimerTask` could be better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/45241813/badge)](https://coveralls.io/builds/45241813)
   
   Coverage increased (+0.1%) to 56.252% when pulling **40878c880e8ce51b3737b18e69250158b4097ede on Git-Yang:develop-schedule_v2** into **75557b8700bef395002a6b5b7c2bb083c120b632 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-965392009


   Depend on issue https://github.com/apache/rocketmq/issues/3473


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-959495469


   Depend on issue https://github.com/apache/rocketmq/pull/3287


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/44502615/badge)](https://coveralls.io/builds/44502615)
   
   Coverage decreased (-0.2%) to 54.848% when pulling **dc39d4ead2b939b57ce63db07b61b315a8dbaf03 on Git-Yang:develop-schedule_v2** into **87b0be00012dff86727766081d4243eeee280699 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
RongtongJin commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r779591109



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -66,10 +72,23 @@
     private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
     private MessageStore writeMessageStore;
     private int maxDelayLevel;
+    private boolean enableAsyncDeliver = false;
+    private int maxPendingLimit;
+    private int maxResendNum2Blocked;
+    private ScheduledExecutorService handleExecutorService;
+    private final Map<Integer /* level */, LinkedBlockingQueue<PutResultProcess>> deliverPendingTable =
+        new ConcurrentHashMap<>(32);
 
     public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
         this.defaultMessageStore = defaultMessageStore;
         this.writeMessageStore = defaultMessageStore;
+        if (defaultMessageStore != null) {
+            this.enableAsyncDeliver = defaultMessageStore.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
+            this.maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxPendingLimit();
+            this.maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxResendNum2Blocked();
+        }

Review comment:
       We can not modify parameters by updateBrokerConfig command at runtime if maxPendingLimit and maxResendNum2Blocked are variables of an object.

##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -308,158 +387,408 @@ public void executeOnTimeup() {
                 ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
 
-            long failScheduleOffset = offset;
+            if (cq == null) {
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-            if (cq != null) {
-                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
-                if (bufferCQ != null) {
-                    try {
-                        long nextOffset = offset;
-                        int i = 0;
-                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
-                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                            long offsetPy = bufferCQ.getByteBuffer().getLong();
-                            int sizePy = bufferCQ.getByteBuffer().getInt();
-                            long tagsCode = bufferCQ.getByteBuffer().getLong();
-
-                            if (cq.isExtAddr(tagsCode)) {
-                                if (cq.getExt(tagsCode, cqExtUnit)) {
-                                    tagsCode = cqExtUnit.getTagsCode();
-                                } else {
-                                    //can't find ext content.So re compute tags code.
-                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
-                                        tagsCode, offsetPy, sizePy);
-                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
-                                }
-                            }
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long resetOffset;
+                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else {
+                    resetOffset = this.offset;
+                }
 
-                            long now = System.currentTimeMillis();
-                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
-                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
-                            long countdown = deliverTimestamp - now;
-
-                            if (countdown <= 0) {
-                                MessageExt msgExt =
-                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                        offsetPy, sizePy);
-
-                                if (msgExt != null) {
-                                    try {
-                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
-                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                msgInner.getTopic(), msgInner);
-                                            continue;
-                                        }
-                                        PutMessageResult putMessageResult =
-                                            ScheduleMessageService.this.writeMessageStore
-                                                .putMessage(msgInner);
-
-                                        if (putMessageResult != null
-                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                    putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                            }
-                                            continue;
-                                        } else {
-                                            // XXX: warn and notify me
-                                            log.error(
-                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                msgExt.getTopic(), msgExt.getMsgId());
-                                            ScheduleMessageService.this.deliverExecutorService.schedule(
-                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                nextOffset);
-                                            return;
-                                        }
-                                    } catch (Exception e) {
-                                        /*
-                                         * XXX: warn and notify me
-                                         */
-                                        log.error(
-                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                    }
-                                }
-                            } else {
-                                ScheduleMessageService.this.deliverExecutorService.schedule(
-                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown, TimeUnit.MILLISECONDS);
-                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
-                                return;
-                            }
-                        } // end of for
+                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+                return;
+            }
+
+            long nextOffset = this.offset;
+            try {
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
 
-                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
-                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                         return;
-                    } finally {
+                    }
+
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
+                    }
 
-                        bufferCQ.release();
+                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                            msgInner.getTopic(), msgInner);
+                        continue;
                     }
-                } // end of if (bufferCQ != null)
-                else {
-
-                    long cqMinOffset = cq.getMinOffsetInQueue();
-                    long cqMaxOffset = cq.getMaxOffsetInQueue();
-                    if (offset < cqMinOffset) {
-                        failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+                    boolean deliverSuc;
+                    if (ScheduleMessageService.this.enableAsyncDeliver) {
+                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                    } else {
+                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                     }
 
-                    if (offset > cqMaxOffset) {
-                        failScheduleOffset = cqMaxOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    if (!deliverSuc) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+                        return;
                     }
                 }
-            } // end of if (cq != null)
 
-            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+            } finally {
+                bufferCQ.release();
+            }
+
+            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+        }
+
+        public void scheduleNextTimerTask(long offset, long delay) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+        }
+
+        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+            PutMessageResult result = resultProcess.get();
+            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+            if (sendStatus) {
+                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+            }
+            return sendStatus;
+        }
+
+        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            //Flow Control
+            int currentPendingNum = processesQueue.size();
+            if (currentPendingNum > ScheduleMessageService.this.maxPendingLimit) {
+                log.warn("Asynchronous deliver triggers flow control, " +
+                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                return false;
+            }
+
+            //Blocked
+            PutResultProcess firstProcess = processesQueue.peek();
+            if (firstProcess != null && firstProcess.need2Blocked()) {
+                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+                return false;
+            }
+
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+            processesQueue.add(resultProcess);
+            return true;
+        }
+
+        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+            long offsetPy, int sizePy, boolean autoResend) {
+            CompletableFuture<PutMessageResult> future =
+                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+            return new PutResultProcess()
+                .setTopic(msgInner.getTopic())
+                .setDelayLevel(this.delayLevel)
+                .setOffset(offset)
+                .setPhysicOffset(offsetPy)
+                .setPhysicSize(sizePy)
+                .setMsgId(msgId)
+                .setAutoResend(autoResend)
+                .setFuture(future)
+                .thenProcess();
         }
+    }
 
-        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setBody(msgExt.getBody());
-            msgInner.setFlag(msgExt.getFlag());
-            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
 
-            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-            long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-            msgInner.setTagsCode(tagsCodeValue);
-            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
+        }
 
-            msgInner.setSysFlag(msgExt.getSysFlag());
-            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-            msgInner.setBornHost(msgExt.getBornHost());
-            msgInner.setStoreHost(msgExt.getStoreHost());
-            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {
+                    switch (putResultProcess.getStatus()) {
+                        case SUCCESS:
+                            ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
+                            pendingQueue.remove();
+                            break;
+                        case RUNNING:
+                            break;
+                        case EXCEPTION:
+                            if (!isStarted()) {
+                                log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
+                                return;
+                            }
+                            log.warn("putResultProcess error, info={}", putResultProcess.toString());
+                            putResultProcess.onException();
+                            break;
+                        case SKIP:
+                            log.warn("putResultProcess skip, info={}", putResultProcess.toString());
+                            pendingQueue.remove();
+                            break;
+                    }
+                } catch (Exception e) {
+                    log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);
+                    putResultProcess.onException();
+                }
+            }
 
-            msgInner.setWaitStoreMsgOK(false);
-            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            if (isStarted()) {
+                ScheduleMessageService.this.handleExecutorService
+                    .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
 
-            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+    public class PutResultProcess {
+        private String topic;
+        private long offset;
+        private long physicOffset;
+        private int physicSize;
+        private int delayLevel;
+        private String msgId;
+        private boolean autoResend = false;
+        private CompletableFuture<PutMessageResult> future;
+
+        private volatile int resendCount = 0;
+        private volatile ProcessStatus status = ProcessStatus.RUNNING;
+
+        public PutResultProcess setTopic(String topic) {
+            this.topic = topic;
+            return this;
+        }
 
-            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-            int queueId = Integer.parseInt(queueIdStr);
-            msgInner.setQueueId(queueId);
+        public PutResultProcess setOffset(long offset) {
+            this.offset = offset;
+            return this;
+        }
 
-            return msgInner;
+        public PutResultProcess setPhysicOffset(long physicOffset) {
+            this.physicOffset = physicOffset;
+            return this;
         }
+
+        public PutResultProcess setPhysicSize(int physicSize) {
+            this.physicSize = physicSize;
+            return this;
+        }
+
+        public PutResultProcess setDelayLevel(int delayLevel) {
+            this.delayLevel = delayLevel;
+            return this;
+        }
+
+        public PutResultProcess setMsgId(String msgId) {
+            this.msgId = msgId;
+            return this;
+        }
+
+        public PutResultProcess setAutoResend(boolean autoResend) {
+            this.autoResend = autoResend;
+            return this;
+        }
+
+        public PutResultProcess setFuture(CompletableFuture<PutMessageResult> future) {
+            this.future = future;
+            return this;
+        }
+
+        public String getTopic() {
+            return topic;
+        }
+
+        public long getOffset() {
+            return offset;
+        }
+
+        public long getNextOffset() {
+            return offset + 1;
+        }
+
+        public long getPhysicOffset() {
+            return physicOffset;
+        }
+
+        public int getPhysicSize() {
+            return physicSize;
+        }
+
+        public Integer getDelayLevel() {
+            return delayLevel;
+        }
+
+        public String getMsgId() {
+            return msgId;
+        }
+
+        public boolean isAutoResend() {
+            return autoResend;
+        }
+
+        public CompletableFuture<PutMessageResult> getFuture() {
+            return future;
+        }
+
+        public int getResendCount() {
+            return resendCount;
+        }
+
+        public PutResultProcess thenProcess() {
+            this.future.thenAccept(result -> {
+                this.handleResult(result);
+            });
+
+            this.future.exceptionally(e -> {
+                log.error("ScheduleMessageService put message exceptionally, info: {}",
+                    PutResultProcess.this.toString(), e);
+
+                onException();
+                return null;
+            });
+            return this;
+        }
+
+        private void handleResult(PutMessageResult result) {
+            if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                onSuccess(result);
+            } else {
+                log.warn("ScheduleMessageService put message failed. info: {}.", result);
+                onException();
+            }
+        }
+
+        public void onSuccess(PutMessageResult result) {
+            this.status = ProcessStatus.SUCCESS;
+            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getMsgNum());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
+                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+            }
+        }
+
+        public void onException() {
+            log.warn("ScheduleMessageService onException, info: {}", this.toString());
+            if (this.autoResend) {
+                this.resend();
+            } else {
+                this.status = ProcessStatus.SKIP;
+            }
+        }
+
+        public ProcessStatus getStatus() {
+            return this.status;
+        }
+
+        public PutMessageResult get() {
+            try {
+                return this.future.get();
+            } catch (InterruptedException | ExecutionException e) {
+                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
+            }
+        }
+
+        private void resend() {
+            log.info("Resend message, info: {}", this.toString());
+
+            // Gradually increase the resend interval.
+            try {
+                Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+
+            try {
+                MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);
+                if (msgExt == null) {
+                    log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());
+                    this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;
+                    return;
+                }
+
+                MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
+                this.handleResult(result);
+                if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+                    log.info("Resend message success, info: {}", this.toString());
+                }
+            } catch (Exception e) {
+                this.status = ProcessStatus.EXCEPTION;
+                log.error("Resend message error, info: {}", this.toString(), e);
+            }
+        }
+
+        public boolean need2Blocked() {
+            return this.resendCount > ScheduleMessageService.this.maxResendNum2Blocked;
+        }
+
+        public boolean need2Skip() {
+            return this.resendCount > ScheduleMessageService.this.maxResendNum2Blocked * 2;
+        }
+
+        @Override
+        public String toString() {
+            return "PutResultProcess{" +
+                "topic='" + topic + '\'' +
+                ", offset=" + offset +
+                ", physicOffset=" + physicOffset +
+                ", physicSize=" + physicSize +
+                ", delayLevel=" + delayLevel +
+                ", msgId='" + msgId + '\'' +
+                ", autoResend=" + autoResend +
+                ", resendCount=" + resendCount +
+                ", status=" + status +
+                '}';
+        }
+    }
+
+    public enum ProcessStatus {
+        RUNNING,
+        SUCCESS,
+        EXCEPTION,
+        SKIP,
     }

Review comment:
       It would be better to add comments to explain the meaning of each state and the message processing results. For example, If return ProcessStatus.SKIP, it will discard the message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-1010953614


   @duhenglucky @areyouok Please help to review the code


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/45442886/badge)](https://coveralls.io/builds/45442886)
   
   Coverage decreased (-2.9%) to 53.219% when pulling **31de6b6ac13cf0f685faace29edc3c7f45824280 on Git-Yang:develop-schedule_v2** into **75557b8700bef395002a6b5b7c2bb083c120b632 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-1001403041


   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3458](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (31de6b6) into [develop](https://codecov.io/gh/apache/rocketmq/commit/8cb14687f3946c95092631d9f9d1447024a741de?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8cb1468) will **decrease** coverage by `2.43%`.
   > The diff coverage is `58.58%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3458/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #3458      +/-   ##
   =============================================
   - Coverage      49.67%   47.24%   -2.44%     
   - Complexity      4718     5038     +320     
   =============================================
     Files            555      628      +73     
     Lines          36822    41582    +4760     
     Branches        4853     5401     +548     
   =============================================
   + Hits           18291    19645    +1354     
   - Misses         16231    19500    +3269     
   - Partials        2300     2437     +137     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache/rocketmq/store/config/MessageStoreConfig.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2NvbmZpZy9NZXNzYWdlU3RvcmVDb25maWcuamF2YQ==) | `60.39% <50.00%> (+0.25%)` | :arrow_up: |
   | [...ocketmq/store/schedule/ScheduleMessageService.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3NjaGVkdWxlL1NjaGVkdWxlTWVzc2FnZVNlcnZpY2UuamF2YQ==) | `66.83% <58.98%> (-7.01%)` | :arrow_down: |
   | [...rocketmq/broker/filtersrv/FilterServerManager.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvZmlsdGVyc3J2L0ZpbHRlclNlcnZlck1hbmFnZXIuamF2YQ==) | `20.00% <0.00%> (-14.29%)` | :arrow_down: |
   | [...tmq/logappender/log4j2/RocketmqLog4j2Appender.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bG9nYXBwZW5kZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL2xvZ2FwcGVuZGVyL2xvZzRqMi9Sb2NrZXRtcUxvZzRqMkFwcGVuZGVyLmphdmE=) | `35.00% <0.00%> (-10.00%)` | :arrow_down: |
   | [...in/java/org/apache/rocketmq/test/util/MQAdmin.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL01RQWRtaW4uamF2YQ==) | `38.88% <0.00%> (-5.56%)` | :arrow_down: |
   | [...org/apache/rocketmq/store/ha/WaitNotifyObject.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL1dhaXROb3RpZnlPYmplY3QuamF2YQ==) | `66.07% <0.00%> (-5.36%)` | :arrow_down: |
   | [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...rocketmq/remoting/netty/NettyRemotingAbstract.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdBYnN0cmFjdC5qYXZh) | `46.88% <0.00%> (-4.03%)` | :arrow_down: |
   | [...he/rocketmq/client/impl/consumer/ProcessQueue.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Qcm9jZXNzUXVldWUuamF2YQ==) | `60.55% <0.00%> (-0.85%)` | :arrow_down: |
   | [...n/java/org/apache/rocketmq/store/ha/HAService.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0hBU2VydmljZS5qYXZh) | `71.28% <0.00%> (-0.67%)` | :arrow_down: |
   | ... and [97 more](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8cb1468...31de6b6](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/44010481/badge)](https://coveralls.io/builds/44010481)
   
   Coverage decreased (-0.2%) to 54.337% when pulling **f2b5755ef4daf29e92ced895245ddd9124fe8ed2 on Git-Yang:develop-schedule_v2** into **2385f0446832d4e799c16d280a88cafc548fab39 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-959502033


   In AsyncDeliverDelayedMessageTimerTask, both synchronous and asynchronous strategies are implemented at the same time, but DeliveryDelayedMessageTimerTask is still retained here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang removed a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang removed a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-959502033


   In AsyncDeliverDelayedMessageTimerTask, both synchronous and asynchronous strategies are implemented at the same time, but DeliveryDelayedMessageTimerTask is still retained here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-965998740


   ### Test Results
   - 1000w messages are sent through the script (level=4), and each message generates a unique self-increasing Key (0~9999999)
   - The message delivery process restarts the broker service.
   - The test result:
      -  no data loss(totalCount=9999999, because the key=19 query failed.)
      -  no data duplication.
      - delivery tps 2.2w+
   ![image](https://user-images.githubusercontent.com/30995057/141241356-10bf4f00-0250-45e6-9223-bf69c1122cac.png)
   ![image](https://user-images.githubusercontent.com/30995057/141241460-01500731-d14a-4904-8c78-e7d9e200f725.png)
   ![image](https://user-images.githubusercontent.com/30995057/141241633-f3dfedc9-8934-421c-9b1e-d41234cdffb8.png)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] duhenglucky commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
duhenglucky commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r788540984



##########
File path: store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
##########
@@ -71,14 +91,104 @@ public void testCorrectDelayOffset_whenInit() throws Exception {
 
     }
 
-    private MessageStore buildMessageStore() throws Exception {
+    private MessageStoreConfig buildMessageStoreConfig() throws Exception {
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
         messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
         messageStoreConfig.setMaxHashSlotNum(10000);
         messageStoreConfig.setMaxIndexNum(100 * 100);
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         messageStoreConfig.setFlushIntervalConsumeQueue(1);
-        return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
+        return messageStoreConfig;
+    }
+
+    @Test
+    public void testHandlePutResultTask() throws Exception {
+        DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
+        MessageStoreConfig config = buildMessageStoreConfig();
+        config.setEnableScheduleMessageStats(false);
+        config.setEnableScheduleAsyncDeliver(true);
+        when(messageStore.getMessageStoreConfig()).thenReturn(config);
+        ScheduleMessageService scheduleMessageService = new ScheduleMessageService(messageStore);
+        scheduleMessageService.parseDelayLevel();
+
+        Field field = scheduleMessageService.getClass().getDeclaredField("deliverPendingTable");
+        field.setAccessible(true);
+        Map<Integer /* level */, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>> deliverPendingTable =
+            (Map<Integer, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>) field.get(scheduleMessageService);
+
+        field = scheduleMessageService.getClass().getDeclaredField("offsetTable");
+        field.setAccessible(true);
+        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
+            (ConcurrentMap<Integer /* level */, Long/* offset */>) field.get(scheduleMessageService);
+        for (int i = 1; i <= scheduleMessageService.getMaxDelayLevel(); i++) {
+            offsetTable.put(i, 0L);
+        }
+
+        int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();

Review comment:
       Is it better to set _deliverThreadPoolNums_ to _delaylevel_?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang merged pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang merged pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/43998472/badge)](https://coveralls.io/builds/43998472)
   
   Coverage decreased (-0.3%) to 54.184% when pulling **e087dab2728e1d905735f591e229cadccc52ec2d on Git-Yang:develop-schedule_v2** into **2385f0446832d4e799c16d280a88cafc548fab39 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/44004599/badge)](https://coveralls.io/builds/44004599)
   
   Coverage decreased (-0.3%) to 54.238% when pulling **be4bde35c06400daba1041a196fc9ae87c8e3b37 on Git-Yang:develop-schedule_v2** into **2385f0446832d4e799c16d280a88cafc548fab39 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/44069101/badge)](https://coveralls.io/builds/44069101)
   
   Coverage decreased (-0.2%) to 54.308% when pulling **c92a0ef5d1b6f9407c0d5a1b043d06c1e3091443 on Git-Yang:develop-schedule_v2** into **c6aeb782e59587c610ff8293462f028993fb8206 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/45243554/badge)](https://coveralls.io/builds/45243554)
   
   Coverage increased (+0.1%) to 56.257% when pulling **200f76adba46c0d31d3f0d827ca471ca432e5e58 on Git-Yang:develop-schedule_v2** into **75557b8700bef395002a6b5b7c2bb083c120b632 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] areyouok commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
areyouok commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r783712530



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -148,7 +167,29 @@ public void run() {
 
     public void shutdown() {
         if (this.started.compareAndSet(true, false) && null != this.deliverExecutorService) {
-            this.deliverExecutorService.shutdownNow();
+            this.deliverExecutorService.shutdown();
+            try {
+                this.deliverExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                log.error("deliverExecutorService awaitTermination error", e);
+            }
+
+            if (this.handleExecutorService != null) {
+                this.handleExecutorService.shutdown();
+                try {
+                    this.handleExecutorService.awaitTermination(WAIT_FOR_SHUTDOWN, TimeUnit.MILLISECONDS);
+                } catch (InterruptedException e) {
+                    log.error("handleExecutorService awaitTermination error", e);
+                }
+            }
+
+            if (this.deliverPendingTable != null) {
+                for (int i = 1; i <= this.deliverPendingTable.size(); i++) {
+                    log.warn("deliverPendingTable level: {}, size: {}", i, this.deliverPendingTable.get(i).size());

Review comment:
       only log this when size()>0 ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] HScarb commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
HScarb commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r761696102



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -431,35 +492,414 @@ public void executeOnTimeup() {
             ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                 failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
         }
+    }
+
+    class AsyncDeliverDelayedMessageTimerTask implements Runnable {

Review comment:
       `AsyncDeliverDelayedMessageTimerTask` is similar with `DeliverDelayedMessageTimerTask`, consider extending `DeliverDelayedMessageTimerTask`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] dongeforever commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
dongeforever commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r788502037



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -308,158 +381,431 @@ public void executeOnTimeup() {
                 ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                     delayLevel2QueueId(delayLevel));
 
-            long failScheduleOffset = offset;
+            if (cq == null) {
+                this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-            if (cq != null) {
-                SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
-                if (bufferCQ != null) {
-                    try {
-                        long nextOffset = offset;
-                        int i = 0;
-                        ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
-                        for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
-                            long offsetPy = bufferCQ.getByteBuffer().getLong();
-                            int sizePy = bufferCQ.getByteBuffer().getInt();
-                            long tagsCode = bufferCQ.getByteBuffer().getLong();
-
-                            if (cq.isExtAddr(tagsCode)) {
-                                if (cq.getExt(tagsCode, cqExtUnit)) {
-                                    tagsCode = cqExtUnit.getTagsCode();
-                                } else {
-                                    //can't find ext content.So re compute tags code.
-                                    log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
-                                        tagsCode, offsetPy, sizePy);
-                                    long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
-                                    tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
-                                }
-                            }
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long resetOffset;
+                if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                        this.offset, resetOffset, cq.getQueueId());
+                } else {
+                    resetOffset = this.offset;
+                }
 
-                            long now = System.currentTimeMillis();
-                            long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
-
-                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-
-                            long countdown = deliverTimestamp - now;
-
-                            if (countdown <= 0) {
-                                MessageExt msgExt =
-                                    ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
-                                        offsetPy, sizePy);
-
-                                if (msgExt != null) {
-                                    try {
-                                        MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
-                                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
-                                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
-                                                msgInner.getTopic(), msgInner);
-                                            continue;
-                                        }
-                                        PutMessageResult putMessageResult =
-                                            ScheduleMessageService.this.writeMessageStore
-                                                .putMessage(msgInner);
-
-                                        if (putMessageResult != null
-                                            && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
-                                            if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
-                                                    putMessageResult.getAppendMessageResult().getWroteBytes());
-                                                ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
-                                            }
-                                            continue;
-                                        } else {
-                                            // XXX: warn and notify me
-                                            log.error(
-                                                "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
-                                                msgExt.getTopic(), msgExt.getMsgId());
-                                            ScheduleMessageService.this.deliverExecutorService.schedule(
-                                                new DeliverDelayedMessageTimerTask(this.delayLevel,
-                                                    nextOffset), DELAY_FOR_A_PERIOD, TimeUnit.MILLISECONDS);
-                                            ScheduleMessageService.this.updateOffset(this.delayLevel,
-                                                nextOffset);
-                                            return;
-                                        }
-                                    } catch (Exception e) {
-                                        /*
-                                         * XXX: warn and notify me
-                                         */
-                                        log.error(
-                                            "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
-                                    }
-                                }
-                            } else {
-                                ScheduleMessageService.this.deliverExecutorService.schedule(
-                                    new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
-                                    countdown, TimeUnit.MILLISECONDS);
-                                ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
-                                return;
-                            }
-                        } // end of for
+                this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
+                return;
+            }
 
-                        nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
-                        ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
-                            this.delayLevel, nextOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
-                        ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+            long nextOffset = this.offset;
+            try {
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
                         return;
-                    } finally {
+                    }
 
-                        bufferCQ.release();
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
                     }
-                } // end of if (bufferCQ != null)
-                else {
-
-                    long cqMinOffset = cq.getMinOffsetInQueue();
-                    long cqMaxOffset = cq.getMaxOffsetInQueue();
-                    if (offset < cqMinOffset) {
-                        failScheduleOffset = cqMinOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+                    MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                    if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                        log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                            msgInner.getTopic(), msgInner);
+                        continue;
+                    }
+
+                    boolean deliverSuc;
+                    if (ScheduleMessageService.this.enableAsyncDeliver) {
+                        deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+                    } else {
+                        deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
                     }
 
-                    if (offset > cqMaxOffset) {
-                        failScheduleOffset = cqMaxOffset;
-                        log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
-                            offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+                    if (!deliverSuc) {
+                        this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+                        return;
                     }
                 }
-            } // end of if (cq != null)
 
-            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
-                failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+                nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+            } finally {
+                bufferCQ.release();
+            }
+
+            this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
         }
 
-        private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
-            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
-            msgInner.setBody(msgExt.getBody());
-            msgInner.setFlag(msgExt.getFlag());
-            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+        public void scheduleNextTimerTask(long offset, long delay) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+                this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+        }
 
-            TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
-            long tagsCodeValue =
-                MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
-            msgInner.setTagsCode(tagsCodeValue);
-            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+        private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+            PutMessageResult result = resultProcess.get();
+            boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+            if (sendStatus) {
+                ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+            }
+            return sendStatus;
+        }
 
-            msgInner.setSysFlag(msgExt.getSysFlag());
-            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
-            msgInner.setBornHost(msgExt.getBornHost());
-            msgInner.setStoreHost(msgExt.getStoreHost());
-            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+        private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+            int sizePy) {
+            Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            //Flow Control
+            int currentPendingNum = processesQueue.size();
+            int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxPendingLimit();
+            if (currentPendingNum > maxPendingLimit) {
+                log.warn("Asynchronous deliver triggers flow control, " +
+                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                return false;
+            }
 
-            msgInner.setWaitStoreMsgOK(false);
-            MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+            //Blocked
+            PutResultProcess firstProcess = processesQueue.peek();
+            if (firstProcess != null && firstProcess.need2Blocked()) {
+                log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+                return false;
+            }
 
-            msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+            PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+            processesQueue.add(resultProcess);
+            return true;
+        }
 
-            String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
-            int queueId = Integer.parseInt(queueIdStr);
-            msgInner.setQueueId(queueId);
+        private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+            long offsetPy, int sizePy, boolean autoResend) {
+            CompletableFuture<PutMessageResult> future =
+                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+            return new PutResultProcess()
+                .setTopic(msgInner.getTopic())
+                .setDelayLevel(this.delayLevel)
+                .setOffset(offset)
+                .setPhysicOffset(offsetPy)
+                .setPhysicSize(sizePy)
+                .setMsgId(msgId)
+                .setAutoResend(autoResend)
+                .setFuture(future)
+                .thenProcess();
+        }
+    }
+
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
 
-            return msgInner;
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
         }
+
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {

Review comment:
       If the handle processor num is less than level num,  this loop may cause starvation of other queues.
   
   It is better to stop loop when handling numbers of messages continuously.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-959495469






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/45121054/badge)](https://coveralls.io/builds/45121054)
   
   Coverage increased (+0.08%) to 56.239% when pulling **687691408cd370ff2432914cfed7b55889ca0f78 on Git-Yang:develop-schedule_v2** into **8cb14687f3946c95092631d9f9d1447024a741de on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] codecov-commenter edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
codecov-commenter edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-1001403041


   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3458](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (31de6b6) into [develop](https://codecov.io/gh/apache/rocketmq/commit/8cb14687f3946c95092631d9f9d1447024a741de?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8cb1468) will **decrease** coverage by `2.43%`.
   > The diff coverage is `58.58%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3458/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #3458      +/-   ##
   =============================================
   - Coverage      49.67%   47.24%   -2.44%     
   - Complexity      4718     5038     +320     
   =============================================
     Files            555      628      +73     
     Lines          36822    41582    +4760     
     Branches        4853     5401     +548     
   =============================================
   + Hits           18291    19645    +1354     
   - Misses         16231    19500    +3269     
   - Partials        2300     2437     +137     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ache/rocketmq/store/config/MessageStoreConfig.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2NvbmZpZy9NZXNzYWdlU3RvcmVDb25maWcuamF2YQ==) | `60.39% <50.00%> (+0.25%)` | :arrow_up: |
   | [...ocketmq/store/schedule/ScheduleMessageService.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3NjaGVkdWxlL1NjaGVkdWxlTWVzc2FnZVNlcnZpY2UuamF2YQ==) | `66.83% <58.98%> (-7.01%)` | :arrow_down: |
   | [...rocketmq/broker/filtersrv/FilterServerManager.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvZmlsdGVyc3J2L0ZpbHRlclNlcnZlck1hbmFnZXIuamF2YQ==) | `20.00% <0.00%> (-14.29%)` | :arrow_down: |
   | [...tmq/logappender/log4j2/RocketmqLog4j2Appender.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-bG9nYXBwZW5kZXIvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL2xvZ2FwcGVuZGVyL2xvZzRqMi9Sb2NrZXRtcUxvZzRqMkFwcGVuZGVyLmphdmE=) | `35.00% <0.00%> (-10.00%)` | :arrow_down: |
   | [...in/java/org/apache/rocketmq/test/util/MQAdmin.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL01RQWRtaW4uamF2YQ==) | `38.88% <0.00%> (-5.56%)` | :arrow_down: |
   | [...org/apache/rocketmq/store/ha/WaitNotifyObject.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL1dhaXROb3RpZnlPYmplY3QuamF2YQ==) | `66.07% <0.00%> (-5.36%)` | :arrow_down: |
   | [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...rocketmq/remoting/netty/NettyRemotingAbstract.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdBYnN0cmFjdC5qYXZh) | `46.88% <0.00%> (-4.03%)` | :arrow_down: |
   | [...he/rocketmq/client/impl/consumer/ProcessQueue.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Qcm9jZXNzUXVldWUuamF2YQ==) | `60.55% <0.00%> (-0.85%)` | :arrow_down: |
   | [...n/java/org/apache/rocketmq/store/ha/HAService.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0hBU2VydmljZS5qYXZh) | `71.28% <0.00%> (-0.67%)` | :arrow_down: |
   | ... and [97 more](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8cb1468...31de6b6](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/45442886/badge)](https://coveralls.io/builds/45442886)
   
   Coverage decreased (-2.9%) to 53.219% when pulling **31de6b6ac13cf0f685faace29edc3c7f45824280 on Git-Yang:develop-schedule_v2** into **75557b8700bef395002a6b5b7c2bb083c120b632 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r779982208



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -66,10 +72,23 @@
     private int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();
     private MessageStore writeMessageStore;
     private int maxDelayLevel;
+    private boolean enableAsyncDeliver = false;
+    private int maxPendingLimit;
+    private int maxResendNum2Blocked;
+    private ScheduledExecutorService handleExecutorService;
+    private final Map<Integer /* level */, LinkedBlockingQueue<PutResultProcess>> deliverPendingTable =
+        new ConcurrentHashMap<>(32);
 
     public ScheduleMessageService(final DefaultMessageStore defaultMessageStore) {
         this.defaultMessageStore = defaultMessageStore;
         this.writeMessageStore = defaultMessageStore;
+        if (defaultMessageStore != null) {
+            this.enableAsyncDeliver = defaultMessageStore.getMessageStoreConfig().isEnableScheduleAsyncDeliver();
+            this.maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxPendingLimit();
+            this.maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+                .getScheduleAsyncDeliverMaxResendNum2Blocked();
+        }

Review comment:
       Yes, I will optimize it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r788577145



##########
File path: store/src/test/java/org/apache/rocketmq/store/ScheduleMessageServiceTest.java
##########
@@ -71,14 +91,104 @@ public void testCorrectDelayOffset_whenInit() throws Exception {
 
     }
 
-    private MessageStore buildMessageStore() throws Exception {
+    private MessageStoreConfig buildMessageStoreConfig() throws Exception {
         MessageStoreConfig messageStoreConfig = new MessageStoreConfig();
         messageStoreConfig.setMappedFileSizeCommitLog(1024 * 1024 * 10);
         messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024 * 10);
         messageStoreConfig.setMaxHashSlotNum(10000);
         messageStoreConfig.setMaxIndexNum(100 * 100);
         messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
         messageStoreConfig.setFlushIntervalConsumeQueue(1);
-        return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
+        return messageStoreConfig;
+    }
+
+    @Test
+    public void testHandlePutResultTask() throws Exception {
+        DefaultMessageStore messageStore = mock(DefaultMessageStore.class);
+        MessageStoreConfig config = buildMessageStoreConfig();
+        config.setEnableScheduleMessageStats(false);
+        config.setEnableScheduleAsyncDeliver(true);
+        when(messageStore.getMessageStoreConfig()).thenReturn(config);
+        ScheduleMessageService scheduleMessageService = new ScheduleMessageService(messageStore);
+        scheduleMessageService.parseDelayLevel();
+
+        Field field = scheduleMessageService.getClass().getDeclaredField("deliverPendingTable");
+        field.setAccessible(true);
+        Map<Integer /* level */, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>> deliverPendingTable =
+            (Map<Integer, LinkedBlockingQueue<ScheduleMessageService.PutResultProcess>>) field.get(scheduleMessageService);
+
+        field = scheduleMessageService.getClass().getDeclaredField("offsetTable");
+        field.setAccessible(true);
+        ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
+            (ConcurrentMap<Integer /* level */, Long/* offset */>) field.get(scheduleMessageService);
+        for (int i = 1; i <= scheduleMessageService.getMaxDelayLevel(); i++) {
+            offsetTable.put(i, 0L);
+        }
+
+        int deliverThreadPoolNums = Runtime.getRuntime().availableProcessors();

Review comment:
       Yes, it seems more appropriate to use delaylevel.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/43998472/badge)](https://coveralls.io/builds/43998472)
   
   Coverage decreased (-0.3%) to 54.184% when pulling **e087dab2728e1d905735f591e229cadccc52ec2d on Git-Yang:develop-schedule_v2** into **2385f0446832d4e799c16d280a88cafc548fab39 on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-959495469






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] HScarb commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
HScarb commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r761700175



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -431,35 +492,414 @@ public void executeOnTimeup() {
             ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                 failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
         }
+    }
+
+    class AsyncDeliverDelayedMessageTimerTask implements Runnable {
+        private final int delayLevel;
+        private final long offset;
+
+        public AsyncDeliverDelayedMessageTimerTask(int delayLevel, long offset) {
+            this.delayLevel = delayLevel;
+            this.offset = offset;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (isStarted()) {
+                    this.executeOnTimeup();
+                }
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, executeOnTimeup exception", e);
+                this.toNextTask(this.offset);
+            }
+        }
+
+        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
+            long result = deliverTimestamp;
+            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
+            if (deliverTimestamp > maxTimestamp) {
+                result = now;
+            }
+            return result;
+        }
+
+        public void executeOnTimeup() {
+            ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
+                    TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
+
+            if (cq == null) {
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long cqMinOffset = cq.getMinOffsetInQueue();
+                if (this.offset < cqMinOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                            this.offset, cqMinOffset, cq.getQueueId());
+                    this.toNextTask(cqMinOffset);
+                    return;
+                }
+
+                long cqMaxOffset = cq.getMaxOffsetInQueue();
+                if (this.offset > cqMaxOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                            this.offset, cqMaxOffset, cq.getQueueId());
+                    this.toNextTask(cqMaxOffset);
+                    return;
+                }
+
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            try {
+                long currentOffset;
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                    tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    currentOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.toNextTask(currentOffset);
+                        return;
+                    }
+
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
+                    }
+
+                    try {
+                        MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                                    msgInner.getTopic(), msgInner);
+                            continue;
+                        }
+
+                        // Asynchronous send flow control.
+                        int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore
+                                .getMessageStoreConfig().getScheduleAsyncDeliverMaxPendingLimit();
+                        int currentPendingNum = this.getDeliverPendingQueue().size();
+                        if (currentPendingNum > maxPendingLimit) {
+                            log.warn("Asynchronous sending triggers flow control, " +
+                                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        // Handling abnormal blocking.
+                        PutResultProcess firstProcess = this.getDeliverPendingQueue().peek();
+                        if (firstProcess != null && firstProcess.need2Wait()) {
+                            log.warn("Asynchronous sending block. info={}", firstProcess.toString());
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        CompletableFuture<PutMessageResult> future =
+                                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+                        PutResultProcess putResultProcess = new PutResultProcess()
+                                .setTopic(msgInner.getTopic())
+                                .setDelayLevel(this.delayLevel)
+                                .setOffset(currentOffset)
+                                .setPhysicOffset(offsetPy)
+                                .setPhysicSize(sizePy)
+                                .setMsgId(msgExt.getMsgId())
+                                .setAutoResend(true)
+                                .setFuture(future)
+                                .thenProcess();
+                        this.getDeliverPendingQueue().add(putResultProcess);
+                        continue;
+                    } catch (Exception e) {
+                        log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, currentOffset, offsetPy, sizePy, e);
+                    }
+                }
+
+                long nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+                this.toNextTask(nextOffset);
+                return;
+            } finally {
+                bufferCQ.release();
+            }
+        }
+
+        public void toNextTask(long offset) {
+            ScheduleMessageService.this.deliverExecutorService.schedule(new AsyncDeliverDelayedMessageTimerTask(
+                    this.delayLevel, offset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
+        }
+
+        public LinkedBlockingQueue<PutResultProcess> getDeliverPendingQueue() {
+            return ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+        }
+    }
+
+    public class HandlePutResultTask implements Runnable {
+        private final int delayLevel;
+
+        public HandlePutResultTask(int delayLevel) {
+            this.delayLevel = delayLevel;
+        }
+
+        @Override
+        public void run() {
+            LinkedBlockingQueue<PutResultProcess> pendingQueue =
+                ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+            PutResultProcess putResultProcess;
+            while ((putResultProcess = pendingQueue.peek()) != null) {
+                try {
+                    if (putResultProcess.isSuccess()) {
+                        ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
+                        pendingQueue.remove();
+                        continue;
+                    } else if (putResultProcess.isCancelled()) {
+                        putResultProcess.onException();
+                    }
+                } catch (Exception e) {
+                    log.error("ScanPutResultTask error", e);
+                    putResultProcess.onException();
+                }
+
+                try {
+                    Thread.sleep(DELAY_FOR_A_SLEEP);

Review comment:
       may I ask why sleep for 0.01s here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] Git-Yang commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
Git-Yang commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-965392009


   Depend on issue [#](https://github.com/apache/rocketmq/issues/3473)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] coveralls edited a comment on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-960253698


   
   [![Coverage Status](https://coveralls.io/builds/44169251/badge)](https://coveralls.io/builds/44169251)
   
   Coverage decreased (-0.3%) to 54.272% when pulling **ada5e0dafd22092f8a91bfc518737743128cd259 on Git-Yang:develop-schedule_v2** into **b35f3e4459e8cf32116bb15133aaa27d5ff63cfb on apache:develop**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] codecov-commenter commented on pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#issuecomment-1001403041


   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3458](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (200f76a) into [develop](https://codecov.io/gh/apache/rocketmq/commit/8cb14687f3946c95092631d9f9d1447024a741de?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (8cb1468) will **increase** coverage by `0.07%`.
   > The diff coverage is `61.56%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/rocketmq/pull/3458/graphs/tree.svg?width=650&height=150&src=pr&token=4w0sxP1wZv&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #3458      +/-   ##
   =============================================
   + Coverage      49.67%   49.75%   +0.07%     
   - Complexity      4718     4738      +20     
   =============================================
     Files            555      555              
     Lines          36822    37008     +186     
     Branches        4853     4878      +25     
   =============================================
   + Hits           18291    18412     +121     
   - Misses         16231    16281      +50     
   - Partials        2300     2315      +15     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...ocketmq/store/schedule/ScheduleMessageService.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3NjaGVkdWxlL1NjaGVkdWxlTWVzc2FnZVNlcnZpY2UuamF2YQ==) | `68.18% <61.32%> (-5.66%)` | :arrow_down: |
   | [...ache/rocketmq/store/config/MessageStoreConfig.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2NvbmZpZy9NZXNzYWdlU3RvcmVDb25maWcuamF2YQ==) | `61.05% <66.66%> (+0.91%)` | :arrow_up: |
   | [...in/java/org/apache/rocketmq/test/util/MQAdmin.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-dGVzdC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcm9ja2V0bXEvdGVzdC91dGlsL01RQWRtaW4uamF2YQ==) | `38.88% <0.00%> (-5.56%)` | :arrow_down: |
   | [...org/apache/rocketmq/store/ha/WaitNotifyObject.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL1dhaXROb3RpZnlPYmplY3QuamF2YQ==) | `66.07% <0.00%> (-5.36%)` | :arrow_down: |
   | [...apache/rocketmq/remoting/netty/ResponseFuture.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L1Jlc3BvbnNlRnV0dXJlLmphdmE=) | `85.00% <0.00%> (-5.00%)` | :arrow_down: |
   | [...rocketmq/remoting/netty/NettyRemotingAbstract.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cmVtb3Rpbmcvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3JlbW90aW5nL25ldHR5L05ldHR5UmVtb3RpbmdBYnN0cmFjdC5qYXZh) | `46.88% <0.00%> (-4.03%)` | :arrow_down: |
   | [...ava/org/apache/rocketmq/filter/util/BitsArray.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZmlsdGVyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9maWx0ZXIvdXRpbC9CaXRzQXJyYXkuamF2YQ==) | `58.11% <0.00%> (-1.71%)` | :arrow_down: |
   | [...he/rocketmq/client/impl/consumer/ProcessQueue.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9Qcm9jZXNzUXVldWUuamF2YQ==) | `60.55% <0.00%> (-0.85%)` | :arrow_down: |
   | [...n/java/org/apache/rocketmq/store/ha/HAService.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL2hhL0hBU2VydmljZS5qYXZh) | `71.28% <0.00%> (-0.67%)` | :arrow_down: |
   | [...ent/impl/consumer/DefaultLitePullConsumerImpl.java](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50L3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jbGllbnQvaW1wbC9jb25zdW1lci9EZWZhdWx0TGl0ZVB1bGxDb25zdW1lckltcGwuamF2YQ==) | `67.99% <0.00%> (-0.52%)` | :arrow_down: |
   | ... and [15 more](https://codecov.io/gh/apache/rocketmq/pull/3458/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [8cb1468...200f76a](https://codecov.io/gh/apache/rocketmq/pull/3458?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org