You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/12/22 06:47:24 UTC

[GitHub] [rocketmq] xdkxlk commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

xdkxlk commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055140306


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java:
##########
@@ -451,30 +465,79 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
             }
             this.brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST, PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, newOffset);
         }
+        reviveOffset = newOffset;
         consumeReviveObj.newOffset = newOffset;
     }
 
-    private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
+    private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
+        if (!shouldRunPopRevive) {
+            POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+            return;
+        }
+        inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
+        List<CompletableFuture<Pair<Long, Boolean>>> futureList = new ArrayList<>(popCheckPoint.getNum());
         for (int j = 0; j < popCheckPoint.getNum(); j++) {
             if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
                 continue;
             }
 
             // retry msg
             long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
-            MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());
-            if (messageExt == null) {
-                POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",
-                    queueId, popCheckPoint.getTopic(), msgOffset);
-                continue;
-            }
-            //skip ck from last epoch
-            if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
-                POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);
-                continue;
-            }
-            reviveRetry(popCheckPoint, messageExt);
+            CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName())
+                .thenApply(messageExt -> {
+                    if (messageExt == null) {
+                        POP_LOGGER.warn("reviveQueueId={}, can not get biz msg topic is {}, offset is {}, then continue",
+                            queueId, popCheckPoint.getTopic(), msgOffset);
+                        return new Pair<>(msgOffset, true);
+                    }
+                    //skip ck from last epoch
+                    if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
+                        POP_LOGGER.warn("reviveQueueId={}, skip ck from last epoch {}", queueId, popCheckPoint);
+                        return new Pair<>(msgOffset, false);

Review Comment:
   In my opinion, there should be `new Pair<>(msgOffset, true)` to skip ck from last epoch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org