You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/11 08:09:38 UTC
[rocketmq] 01/02: [ISSUE #5847] Fix wake up in NotificationProcessor
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 0fea748f8ba5595049867651479325eac85d7af1
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Tue Jan 10 16:07:28 2023 +0800
[ISSUE #5847] Fix wake up in NotificationProcessor
---
.../broker/longpolling/NotificationRequest.java | 2 +-
.../broker/processor/NotificationProcessor.java | 62 ++++++++++++++--------
2 files changed, 42 insertions(+), 22 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java
index fdae88128..2ff9a73a4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/NotificationRequest.java
@@ -43,7 +43,7 @@ public class NotificationRequest {
}
public boolean isTimeout() {
- return System.currentTimeMillis() > (expired - 3000);
+ return System.currentTimeMillis() > (expired - 500);
}
public boolean complete() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 0b580df0f..b5e611a98 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -63,7 +63,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
break;
}
try {
- Thread.sleep(2000L);
+ Thread.sleep(200L);
Collection<ArrayBlockingQueue<NotificationRequest>> pops = pollingMap.values();
for (ArrayBlockingQueue<NotificationRequest> popQ : pops) {
NotificationRequest tmPopRequest = popQ.peek();
@@ -73,15 +73,9 @@ public class NotificationProcessor implements NettyRequestProcessor {
if (tmPopRequest == null) {
break;
}
- if (!tmPopRequest.isTimeout()) {
- POP_LOGGER.info("not timeout , but wakeUp Notification in advance: {}", tmPopRequest);
- wakeUp(tmPopRequest, false);
- break;
- } else {
- POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest);
- wakeUp(tmPopRequest, false);
- tmPopRequest = popQ.peek();
- }
+ POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest);
+ wakeUp(tmPopRequest, false);
+ tmPopRequest = popQ.peek();
} else {
break;
}
@@ -146,17 +140,26 @@ public class NotificationProcessor implements NettyRequestProcessor {
return;
}
Runnable run = () -> {
- final RemotingCommand response = NotificationProcessor.this.responseNotification(request.getChannel(), hasMsg);
- if (response != null) {
- response.setOpaque(request.getRemotingCommand().getOpaque());
- response.markResponseType();
- NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> {
- if (!future.isSuccess()) {
- POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
- POP_LOGGER.error(request.toString());
- POP_LOGGER.error(response.toString());
- }
- });
+ try {
+ final RemotingCommand response;
+ if (hasMsg) {
+ response = NotificationProcessor.this.responseNotification(request.getChannel(), true);
+ } else {
+ response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
+ }
+ if (response != null) {
+ response.setOpaque(request.getRemotingCommand().getOpaque());
+ response.markResponseType();
+ NettyRemotingAbstract.writeResponse(request.getChannel(), request.getRemotingCommand(), response, future -> {
+ if (!future.isSuccess()) {
+ POP_LOGGER.error("ProcessRequestWrapper response to {} failed", request.getChannel().remoteAddress(), future.cause());
+ POP_LOGGER.error(request.toString());
+ POP_LOGGER.error(response.toString());
+ }
+ });
+ }
+ } catch (RemotingCommandException e) {
+ POP_LOGGER.error("ExecuteRequestWhenWakeup run", e);
}
};
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
@@ -228,6 +231,9 @@ public class NotificationProcessor implements NettyRequestProcessor {
for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+ if (hasMsg) {
+ break;
+ }
}
}
}
@@ -244,6 +250,20 @@ public class NotificationProcessor implements NettyRequestProcessor {
int queueId = requestHeader.getQueueId();
hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
}
+ // if it has message, fetch retry again
+ if (!needRetry && !hasMsg) {
+ TopicConfig retryTopicConfig =
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+ if (retryTopicConfig != null) {
+ for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
+ int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
+ hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+ if (hasMsg) {
+ break;
+ }
+ }
+ }
+ }
if (!hasMsg) {
if (polling(channel, request, requestHeader)) {