You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/12/06 02:18:07 UTC

[GitHub] [rocketmq] Git-Yang commented on a change in pull request #3458: [ISSUE #3449] Delayed message supports asynchronous delivery

Git-Yang commented on a change in pull request #3458:
URL: https://github.com/apache/rocketmq/pull/3458#discussion_r762662505



##########
File path: store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
##########
@@ -431,35 +492,414 @@ public void executeOnTimeup() {
             ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                 failScheduleOffset), DELAY_FOR_A_WHILE, TimeUnit.MILLISECONDS);
         }
+    }
+
+    class AsyncDeliverDelayedMessageTimerTask implements Runnable {
+        private final int delayLevel;
+        private final long offset;
+
+        public AsyncDeliverDelayedMessageTimerTask(int delayLevel, long offset) {
+            this.delayLevel = delayLevel;
+            this.offset = offset;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (isStarted()) {
+                    this.executeOnTimeup();
+                }
+            } catch (Exception e) {
+                log.error("ScheduleMessageService, executeOnTimeup exception", e);
+                this.toNextTask(this.offset);
+            }
+        }
+
+        private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
+            long result = deliverTimestamp;
+            long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
+            if (deliverTimestamp > maxTimestamp) {
+                result = now;
+            }
+            return result;
+        }
+
+        public void executeOnTimeup() {
+            ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(
+                    TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
+
+            if (cq == null) {
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
+            if (bufferCQ == null) {
+                long cqMinOffset = cq.getMinOffsetInQueue();
+                if (this.offset < cqMinOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
+                            this.offset, cqMinOffset, cq.getQueueId());
+                    this.toNextTask(cqMinOffset);
+                    return;
+                }
+
+                long cqMaxOffset = cq.getMaxOffsetInQueue();
+                if (this.offset > cqMaxOffset) {
+                    log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
+                            this.offset, cqMaxOffset, cq.getQueueId());
+                    this.toNextTask(cqMaxOffset);
+                    return;
+                }
+
+                this.toNextTask(this.offset);
+                return;
+            }
+
+            try {
+                long currentOffset;
+                int i = 0;
+                ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
+                for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
+                    long offsetPy = bufferCQ.getByteBuffer().getLong();
+                    int sizePy = bufferCQ.getByteBuffer().getInt();
+                    long tagsCode = bufferCQ.getByteBuffer().getLong();
+
+                    if (cq.isExtAddr(tagsCode)) {
+                        if (cq.getExt(tagsCode, cqExtUnit)) {
+                            tagsCode = cqExtUnit.getTagsCode();
+                        } else {
+                            //can't find ext content.So re compute tags code.
+                            log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+                                    tagsCode, offsetPy, sizePy);
+                            long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+                            tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+                        }
+                    }
+
+                    long now = System.currentTimeMillis();
+                    long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+                    currentOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+
+                    long countdown = deliverTimestamp - now;
+                    if (countdown > 0) {
+                        this.toNextTask(currentOffset);
+                        return;
+                    }
+
+                    MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+                    if (msgExt == null) {
+                        continue;
+                    }
+
+                    try {
+                        MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+                        if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+                            log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+                                    msgInner.getTopic(), msgInner);
+                            continue;
+                        }
+
+                        // Asynchronous send flow control.
+                        int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore
+                                .getMessageStoreConfig().getScheduleAsyncDeliverMaxPendingLimit();
+                        int currentPendingNum = this.getDeliverPendingQueue().size();
+                        if (currentPendingNum > maxPendingLimit) {
+                            log.warn("Asynchronous sending triggers flow control, " +
+                                    "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        // Handling abnormal blocking.
+                        PutResultProcess firstProcess = this.getDeliverPendingQueue().peek();
+                        if (firstProcess != null && firstProcess.need2Wait()) {
+                            log.warn("Asynchronous sending block. info={}", firstProcess.toString());
+                            this.toNextTask(currentOffset);
+                            return;
+                        }
+
+                        CompletableFuture<PutMessageResult> future =
+                                ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+                        PutResultProcess putResultProcess = new PutResultProcess()
+                                .setTopic(msgInner.getTopic())
+                                .setDelayLevel(this.delayLevel)
+                                .setOffset(currentOffset)
+                                .setPhysicOffset(offsetPy)
+                                .setPhysicSize(sizePy)
+                                .setMsgId(msgExt.getMsgId())
+                                .setAutoResend(true)
+                                .setFuture(future)
+                                .thenProcess();
+                        this.getDeliverPendingQueue().add(putResultProcess);
+                        continue;
+                    } catch (Exception e) {
+                        log.error("ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, currentOffset, offsetPy, sizePy, e);
+                    }
+                }
+
+                long nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+                this.toNextTask(nextOffset);
+                return;
+            } finally {
+                bufferCQ.release();
+            }
+        }
+
+        public void toNextTask(long offset) {

Review comment:
       Agree with your suggestion, thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org