You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/11 08:09:38 UTC

[rocketmq] 01/02: [ISSUE #5847] Fix wake up in NotificationProcessor

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 0fea748f8ba5595049867651479325eac85d7af1
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Jan 10 16:07:28 2023 +0800

    [ISSUE #5847] Fix wake up in NotificationProcessor
---
 .../broker/longpolling/NotificationRequest.java    |  2 +-
 .../broker/processor/NotificationProcessor.java    | 62 ++++++++++++++--------
 2 files changed, 42 insertions(+), 22 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java
index fdae88128..2ff9a73a4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java
@@ -43,7 +43,7 @@ public class NotificationRequest {
     }
 
     public boolean isTimeout() {
-        return System.currentTimeMillis() > (expired - 3000);
+        return System.currentTimeMillis() > (expired - 500);
     }
 
     public boolean complete() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 0b580df0f..b5e611a98 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -63,7 +63,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
                         break;
                     }
                     try {
-                        Thread.sleep(2000L);
+                        Thread.sleep(200L);
                         Collection<ArrayBlockingQueue<NotificationRequest>> pops = pollingMap.values();
                         for (ArrayBlockingQueue<NotificationRequest> popQ : pops) {
                             NotificationRequest tmPopRequest = popQ.peek();
@@ -73,15 +73,9 @@ public class NotificationProcessor implements NettyRequestProcessor {
                                     if (tmPopRequest == null) {
                                         break;
                                     }
-                                    if (!tmPopRequest.isTimeout()) {
-                                        POP_LOGGER.info("not timeout , but wakeUp Notification in advance: {}", tmPopRequest);
-                                        wakeUp(tmPopRequest, false);
-                                        break;
-                                    } else {
-                                        POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest);
-                                        wakeUp(tmPopRequest, false);
-                                        tmPopRequest = popQ.peek();
-                                    }
+                                    POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest);
+                                    wakeUp(tmPopRequest, false);
+                                    tmPopRequest = popQ.peek();
                                 } else {
                                     break;
                                 }
@@ -146,17 +140,26 @@ public class NotificationProcessor implements NettyRequestProcessor {
             return;
         }
         Runnable run = () -> {
-            final RemotingCommand response = NotificationProcessor.this.responseNotification(request.getChannel(), hasMsg);
-            if (response != null) {
-                response.setOpaque(request.getRemotingCommand().getOpaque());
-                response.markResponseType();
-                NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future ->  {
-                    if (!future.isSuccess()) {
-                        POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
-                        POP_LOGGER.error(request.toString());
-                        POP_LOGGER.error(response.toString());
-                    }
-                });
+            try {
+                final RemotingCommand response;
+                if (hasMsg) {
+                    response = NotificationProcessor.this.responseNotification(request.getChannel(), true);
+                } else {
+                    response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
+                }
+                if (response != null) {
+                    response.setOpaque(request.getRemotingCommand().getOpaque());
+                    response.markResponseType();
+                    NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> {
+                        if (!future.isSuccess()) {
+                            POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
+                            POP_LOGGER.error(request.toString());
+                            POP_LOGGER.error(response.toString());
+                        }
+                    });
+                }
+            } catch (RemotingCommandException e) {
+                POP_LOGGER.error("ExecuteRequestWhenWakeup run", e);
             }
         };
         this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
@@ -228,6 +231,9 @@ public class NotificationProcessor implements NettyRequestProcessor {
                 for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
                     int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
                     hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+                    if (hasMsg) {
+                        break;
+                    }
                 }
             }
         }
@@ -244,6 +250,20 @@ public class NotificationProcessor implements NettyRequestProcessor {
             int queueId = requestHeader.getQueueId();
             hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
         }
+        // if it has message, fetch retry again
+        if (!needRetry && !hasMsg) {
+            TopicConfig retryTopicConfig =
+                this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+            if (retryTopicConfig != null) {
+                for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
+                    int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
+                    hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+                    if (hasMsg) {
+                        break;
+                    }
+                }
+            }
+        }
 
         if (!hasMsg) {
             if (polling(channel, request, requestHeader)) {