You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/16 07:57:38 UTC
[rocketmq] 01/01: [ISSUE #5847] Refector logic in NotificationProcessor
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch feature/notification
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit cc33e4eb2af4907c063f5d83c4006550d1880160
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Mon Jan 16 11:42:36 2023 +0800
[ISSUE #5847] Refector logic in NotificationProcessor
* Fix hasMsg logic in NotificationProcessor
* Add notify in PopReviveService and order ack situation
* Refector wakeup logic
---
.../apache/rocketmq/broker/BrokerController.java | 4 ++
.../broker/processor/AckMessageProcessor.java | 11 ++-
.../broker/processor/NotificationProcessor.java | 81 +++++++++-------------
.../broker/processor/PopMessageProcessor.java | 1 +
.../broker/processor/PopReviveService.java | 2 +
5 files changed, 49 insertions(+), 50 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index eb9f629b5..8b3dbdfa9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1228,6 +1228,10 @@ public class BrokerController {
return popMessageProcessor;
}
+ public NotificationProcessor getNotificationProcessor() {
+ return notificationProcessor;
+ }
+
public TimerMessageStore getTimerMessageStore() {
return timerMessageStore;
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 80f06aed0..7b366cb8b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -141,6 +141,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
int rqId = ExtraInfoUtil.getReviveQid(extraInfo);
+ long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
this.brokerController.getBrokerStatsManager().incBrokerAckNums(1);
this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1);
@@ -171,8 +172,12 @@ public class AckMessageProcessor implements NettyRequestProcessor {
requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(),
nextOffset);
- this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
- requestHeader.getQueueId());
+ if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
+ requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) {
+ this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
+ requestHeader.getQueueId());
+ this.brokerController.getNotificationProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getQueueId());
+ }
} else if (nextOffset == -1) {
String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
lockKey, oldOffset, requestHeader.getOffset(), nextOffset, channel.remoteAddress());
@@ -201,7 +206,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
msgInner.setBornTimestamp(System.currentTimeMillis());
msgInner.setBornHost(this.brokerController.getStoreHost());
msgInner.setStoreHost(this.brokerController.getStoreHost());
- msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));
+ msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + invisibleTime);
msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 332843b9f..e10d72328 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -74,7 +74,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
break;
}
POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest);
- wakeUp(tmPopRequest, false);
+ wakeUp(tmPopRequest);
tmPopRequest = popQ.peek();
} else {
break;
@@ -111,28 +111,25 @@ public class NotificationProcessor implements NettyRequestProcessor {
}
public void notifyMessageArriving(final String topic, final int queueId) {
- ArrayBlockingQueue<NotificationRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, -1));
- if (remotingCommands != null) {
- List<NotificationRequest> c = new ArrayList<>();
- remotingCommands.drainTo(c);
- for (NotificationRequest notificationRequest : c) {
- POP_LOGGER.info("new msg arrive , wakeUp : {}", notificationRequest);
- wakeUp(notificationRequest, true);
-
- }
+ notifyMessageArrivingForQueue(topic, -1);
+ if (queueId > 0) {
+ notifyMessageArrivingForQueue(topic, queueId);
}
- remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, queueId));
+ }
+
+ public void notifyMessageArrivingForQueue(final String topic, final int queueId) {
+ ArrayBlockingQueue<NotificationRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, queueId));
if (remotingCommands != null) {
List<NotificationRequest> c = new ArrayList<>();
remotingCommands.drainTo(c);
for (NotificationRequest notificationRequest : c) {
POP_LOGGER.info("new msg arrive , wakeUp : {}", notificationRequest);
- wakeUp(notificationRequest, true);
+ wakeUp(notificationRequest);
}
}
}
- private void wakeUp(final NotificationRequest request, final boolean hasMsg) {
+ private void wakeUp(final NotificationRequest request) {
if (request == null || !request.complete()) {
return;
}
@@ -142,11 +139,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
Runnable run = () -> {
try {
final RemotingCommand response;
- if (hasMsg) {
- response = NotificationProcessor.this.responseNotification(request.getChannel(), true);
- } else {
- response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
- }
+ response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
if (response != null) {
response.setOpaque(request.getRemotingCommand().getOpaque());
response.markResponseType();
@@ -165,14 +158,6 @@ public class NotificationProcessor implements NettyRequestProcessor {
this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
}
- public RemotingCommand responseNotification(final Channel channel, boolean hasMsg) {
- RemotingCommand response = RemotingCommand.createResponseCommand(NotificationResponseHeader.class);
- final NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.readCustomHeader();
- responseHeader.setHasMsg(hasMsg);
- response.setCode(ResponseCode.SUCCESS);
- return response;
- }
-
private RemotingCommand processRequest(final Channel channel, RemotingCommand request)
throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(NotificationResponseHeader.class);
@@ -237,31 +222,33 @@ public class NotificationProcessor implements NettyRequestProcessor {
}
}
}
- if (!hasMsg && requestHeader.getQueueId() < 0) {
- // read all queue
- for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
- int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
- hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
- if (hasMsg) {
- break;
- }
- }
- } else {
- int queueId = requestHeader.getQueueId();
- hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
- }
- // if it doesn't have message, fetch retry again
- if (!needRetry && !hasMsg) {
- TopicConfig retryTopicConfig =
- this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
- if (retryTopicConfig != null) {
- for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
- int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
- hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+ if (!hasMsg) {
+ if (requestHeader.getQueueId() < 0) {
+ // read all queue
+ for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
+ int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
+ hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
if (hasMsg) {
break;
}
}
+ } else {
+ int queueId = requestHeader.getQueueId();
+ hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
+ }
+ // if it doesn't have message, fetch retry again
+ if (!needRetry && !hasMsg) {
+ TopicConfig retryTopicConfig =
+ this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+ if (retryTopicConfig != null) {
+ for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
+ int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
+ hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+ if (hasMsg) {
+ break;
+ }
+ }
+ }
}
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 2bea535f4..e2d51857e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -181,6 +181,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
// notify pop queue
notifySuccess = this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, group, queueId);
}
+ this.brokerController.getNotificationProcessor().notifyMessageArriving(topic, queueId);
if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("notify long polling request. topic:{}, group:{}, queueId:{}, success:{}",
topic, group, queueId, notifySuccess);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 52b848b07..d0e9dbc36 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -140,6 +140,8 @@ public class PopReviveService extends ServiceThread {
popCheckPoint.getCId(),
-1
);
+ brokerController.getNotificationProcessor().notifyMessageArriving(
+ KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
}
return true;
}