You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/12/22 06:52:54 UTC

[GitHub] [rocketmq] lizhimins commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore

lizhimins commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055124280


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java:
##########
@@ -80,12 +84,31 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
         final boolean brokerAllowSuspend,
         final MessageFilter messageFilter,
         RemotingCommand response) {
-
         PullMessageProcessor processor = brokerController.getPullMessageProcessor();
-        processor.updateBroadcastPulledOffset(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
-            requestHeader.getQueueId(), requestHeader, channel, response, getMessageResult.getNextBeginOffset());
+        final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel);
+        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+        processor.composeResponseHeader(requestHeader, getMessageResult, topicConfig.getTopicSysFlag(),
+            subscriptionGroupConfig, response, clientAddress);
+        try {
+            processor.executeConsumeMessageHookBefore(request, requestHeader, getMessageResult, brokerAllowSuspend, response.getCode());
+        } catch (AbortProcessException e) {
+            response.setCode(e.getResponseCode());
+            response.setRemark(e.getErrorMessage());
+            return response;
+        }
 
+        //rewrite the response for the

Review Comment:
   注释不全



##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, int queueId, String brok
         }
     }
 
+    public CompletableFuture<MessageExt> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {

Review Comment:
   应该把同步接口也用异步的去实现



##########
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java:
##########
@@ -1269,6 +1269,44 @@ public PullResult pullMessageFromSpecificBroker(String brokerName, String broker
         return pullResultExt;
     }
 
+    public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
+                                                    String consumerGroup, String topic, int queueId, long offset,
+                                                    int maxNums,
+                                                    long timeoutMillis) throws RemotingException, InterruptedException {

Review Comment:
   codestyle is strange



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org