You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/12/21 15:18:50 UTC

[GitHub] [rocketmq] ShadowySpirits opened a new pull request, #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

ShadowySpirits opened a new pull request, #5755:
URL: https://github.com/apache/rocketmq/pull/5755

   link to #5754


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] lizhimins commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
lizhimins commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055124280


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java:
##########
@@ -80,12 +84,31 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
         final boolean brokerAllowSuspend,
         final MessageFilter messageFilter,
         RemotingCommand response) {
-
         PullMessageProcessor processor = brokerController.getPullMessageProcessor();
-        processor.updateBroadcastPulledOffset(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
-            requestHeader.getQueueId(), requestHeader, channel, response, getMessageResult.getNextBeginOffset());
+        final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel);
+        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+        processor.composeResponseHeader(requestHeader, getMessageResult, topicConfig.getTopicSysFlag(),
+            subscriptionGroupConfig, response, clientAddress);
+        try {
+            processor.executeConsumeMessageHookBefore(request, requestHeader, getMessageResult, brokerAllowSuspend, response.getCode());
+        } catch (AbortProcessException e) {
+            response.setCode(e.getResponseCode());
+            response.setRemark(e.getErrorMessage());
+            return response;
+        }
 
+        //rewrite the response for the

Review Comment:
   注释不全



##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, int queueId, String brok
         }
     }
 
+    public CompletableFuture<MessageExt> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {

Review Comment:
   应该把同步接口也用异步的去实现



##########
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java:
##########
@@ -1269,6 +1269,44 @@ public PullResult pullMessageFromSpecificBroker(String brokerName, String broker
         return pullResultExt;
     }
 
+    public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
+                                                    String consumerGroup, String topic, int queueId, long offset,
+                                                    int maxNums,
+                                                    long timeoutMillis) throws RemotingException, InterruptedException {

Review Comment:
   codestyle is strange



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055334069


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java:
##########
@@ -122,7 +125,8 @@ private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) thr
         }
         if (putMessageResult.getAppendMessageResult() == null ||
             putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
-            throw new Exception("reviveQueueId=" + queueId + ", revive error, msg is: " + msgInner);
+            POP_LOGGER.error("reviveQueueId=" + queueId + ", revive error, msg is: " + msgInner);

Review Comment:
   Use `{}` to print the argument.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055325703


##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, int queueId, String brok
         }
     }
 
+    public CompletableFuture<MessageExt> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
+        MessageStore messageStore = brokerController.getMessageStoreByBrokerName(brokerName);
+        if (messageStore != null) {
+            return messageStore.getMessageAsync(innerConsumerGroupName, topic, queueId, offset, 1, null)
+                .thenApply(result -> {
+                    if (result == null) {
+                        LOG.warn("getMessageResult is null , innerConsumerGroupName {}, topic {}, offset {}, queueId {}", innerConsumerGroupName, topic, offset, queueId);
+                        return null;
+                    }
+                    List<MessageExt> list = decodeMsgList(result, deCompressBody);
+                    if (list == null || list.isEmpty()) {
+                        LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result);
+                        return null;
+                    } else {
+                        return list.get(0);

Review Comment:
   redundant else, maybe we could refer to [Return Early Pattern](https://medium.com/swlh/return-early-pattern-3d18a41bba8).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] xdkxlk commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
xdkxlk commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055140306


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java:
##########
@@ -451,30 +465,79 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
             }
             this.brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST, PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, newOffset);
         }
+        reviveOffset = newOffset;
         consumeReviveObj.newOffset = newOffset;
     }
 
-    private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
+    private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
+        if (!shouldRunPopRevive) {
+            POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+            return;
+        }
+        inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
+        List<CompletableFuture<Pair<Long, Boolean>>> futureList = new ArrayList<>(popCheckPoint.getNum());
         for (int j = 0; j < popCheckPoint.getNum(); j++) {
             if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
                 continue;
             }
 
             // retry msg
             long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
-            MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());
-            if (messageExt == null) {
-                POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",
-                    queueId, popCheckPoint.getTopic(), msgOffset);
-                continue;
-            }
-            //skip ck from last epoch
-            if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
-                POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);
-                continue;
-            }
-            reviveRetry(popCheckPoint, messageExt);
+            CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName())
+                .thenApply(messageExt -> {
+                    if (messageExt == null) {
+                        POP_LOGGER.warn("reviveQueueId={}, can not get biz msg topic is {}, offset is {}, then continue",
+                            queueId, popCheckPoint.getTopic(), msgOffset);
+                        return new Pair<>(msgOffset, true);
+                    }
+                    //skip ck from last epoch
+                    if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
+                        POP_LOGGER.warn("reviveQueueId={}, skip ck from last epoch {}", queueId, popCheckPoint);
+                        return new Pair<>(msgOffset, false);

Review Comment:
   In my opinion, there should be `new Pair<>(msgOffset, true)` to skip ck from last epoch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055325703


##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, int queueId, String brok
         }
     }
 
+    public CompletableFuture<MessageExt> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
+        MessageStore messageStore = brokerController.getMessageStoreByBrokerName(brokerName);
+        if (messageStore != null) {
+            return messageStore.getMessageAsync(innerConsumerGroupName, topic, queueId, offset, 1, null)
+                .thenApply(result -> {
+                    if (result == null) {
+                        LOG.warn("getMessageResult is null , innerConsumerGroupName {}, topic {}, offset {}, queueId {}", innerConsumerGroupName, topic, offset, queueId);
+                        return null;
+                    }
+                    List<MessageExt> list = decodeMsgList(result, deCompressBody);
+                    if (list == null || list.isEmpty()) {
+                        LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result);
+                        return null;
+                    } else {
+                        return list.get(0);

Review Comment:
   redundant else



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055335199


##########
store/src/main/java/org/apache/rocketmq/store/MessageStore.java:
##########
@@ -137,6 +140,9 @@ GetMessageResult getMessage(final String group, final String topic, final int qu
     GetMessageResult getMessage(final String group, final String topic, final int queueId,
         final long offset, final int maxMsgNums, final int maxTotalMsgSize, final MessageFilter messageFilter);
 
+    CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,

Review Comment:
   Add comments for interface.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] aaron-ai merged pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
aaron-ai merged PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#issuecomment-1363577441

   # [Codecov](https://codecov.io/gh/apache/rocketmq/pull/5755?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#5755](https://codecov.io/gh/apache/rocketmq/pull/5755?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (979a4af) into [develop](https://codecov.io/gh/apache/rocketmq/commit/646d04f42524636ad09364af2a67fa8ab8b6b8fa?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (646d04f) will **decrease** coverage by `0.00%`.
   > The diff coverage is `39.24%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #5755      +/-   ##
   =============================================
   - Coverage      42.45%   42.45%   -0.01%     
   - Complexity      8215     8239      +24     
   =============================================
     Files           1060     1060              
     Lines          73193    73316     +123     
     Branches        9598     9597       -1     
   =============================================
   + Hits           31071    31123      +52     
   - Misses         38221    38282      +61     
   - Partials        3901     3911      +10     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/5755?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...org/apache/rocketmq/broker/out/BrokerOuterAPI.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvb3V0L0Jyb2tlck91dGVyQVBJLmphdmE=) | `16.55% <0.00%> (-0.82%)` | :arrow_down: |
   | [...org/apache/rocketmq/store/DefaultMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0RlZmF1bHRNZXNzYWdlU3RvcmUuamF2YQ==) | `52.56% <0.00%> (-0.21%)` | :arrow_down: |
   | [...n/java/org/apache/rocketmq/store/MessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL01lc3NhZ2VTdG9yZS5qYXZh) | `0.00% <ø> (ø)` | |
   | [...ketmq/store/plugin/AbstractPluginMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3BsdWdpbi9BYnN0cmFjdFBsdWdpbk1lc3NhZ2VTdG9yZS5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...a/org/apache/rocketmq/store/pop/PopCheckPoint.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3BvcC9Qb3BDaGVja1BvaW50LmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...he/rocketmq/broker/processor/PopReviveService.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcFJldml2ZVNlcnZpY2UuamF2YQ==) | `40.05% <29.85%> (-0.26%)` | :arrow_down: |
   | [...rocketmq/broker/processor/PopMessageProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcE1lc3NhZ2VQcm9jZXNzb3IuamF2YQ==) | `36.60% <43.39%> (-0.20%)` | :arrow_down: |
   | [...ocketmq/broker/processor/PullMessageProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1B1bGxNZXNzYWdlUHJvY2Vzc29yLmphdmE=) | `28.75% <48.71%> (+1.07%)` | :arrow_up: |
   | [.../apache/rocketmq/broker/failover/EscapeBridge.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvZmFpbG92ZXIvRXNjYXBlQnJpZGdlLmphdmE=) | `41.66% <50.00%> (+8.13%)` | :arrow_up: |
   | [...ker/processor/DefaultPullMessageResultHandler.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL0RlZmF1bHRQdWxsTWVzc2FnZVJlc3VsdEhhbmRsZXIuamF2YQ==) | `38.88% <55.55%> (+1.67%)` | :arrow_up: |
   | ... and [30 more](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org