You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2023/01/16 07:57:38 UTC

[rocketmq] 01/01: [ISSUE #5847] Refector logic in NotificationProcessor

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch feature/notification
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit cc33e4eb2af4907c063f5d83c4006550d1880160
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Mon Jan 16 11:42:36 2023 +0800

    [ISSUE #5847] Refector logic in NotificationProcessor
    
    * Fix hasMsg logic in NotificationProcessor
    * Add notify in PopReviveService and order ack situation
    * Refector wakeup logic
---
 .../apache/rocketmq/broker/BrokerController.java   |  4 ++
 .../broker/processor/AckMessageProcessor.java      | 11 ++-
 .../broker/processor/NotificationProcessor.java    | 81 +++++++++-------------
 .../broker/processor/PopMessageProcessor.java      |  1 +
 .../broker/processor/PopReviveService.java         |  2 +
 5 files changed, 49 insertions(+), 50 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index eb9f629b5..8b3dbdfa9 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -1228,6 +1228,10 @@ public class BrokerController {
         return popMessageProcessor;
     }
 
+    public NotificationProcessor getNotificationProcessor() {
+        return notificationProcessor;
+    }
+
     public TimerMessageStore getTimerMessageStore() {
         return timerMessageStore;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 80f06aed0..7b366cb8b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -141,6 +141,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
         ackMsg.setBrokerName(ExtraInfoUtil.getBrokerName(extraInfo));
 
         int rqId = ExtraInfoUtil.getReviveQid(extraInfo);
+        long invisibleTime = ExtraInfoUtil.getInvisibleTime(extraInfo);
 
         this.brokerController.getBrokerStatsManager().incBrokerAckNums(1);
         this.brokerController.getBrokerStatsManager().incGroupAckNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1);
@@ -171,8 +172,12 @@ public class AckMessageProcessor implements NettyRequestProcessor {
                         requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                         requestHeader.getQueueId(),
                         nextOffset);
-                    this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
-                        requestHeader.getQueueId());
+                    if (!this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(),
+                        requestHeader.getConsumerGroup(), requestHeader.getQueueId(), invisibleTime)) {
+                        this.brokerController.getPopMessageProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
+                            requestHeader.getQueueId());
+                        this.brokerController.getNotificationProcessor().notifyMessageArriving(requestHeader.getTopic(), requestHeader.getQueueId());
+                    }
                 } else if (nextOffset == -1) {
                     String errorInfo = String.format("offset is illegal, key:%s, old:%d, commit:%d, next:%d, %s",
                         lockKey, oldOffset, requestHeader.getOffset(), nextOffset, channel.remoteAddress());
@@ -201,7 +206,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
         msgInner.setBornTimestamp(System.currentTimeMillis());
         msgInner.setBornHost(this.brokerController.getStoreHost());
         msgInner.setStoreHost(this.brokerController.getStoreHost());
-        msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + ExtraInfoUtil.getInvisibleTime(extraInfo));
+        msgInner.setDeliverTimeMs(ExtraInfoUtil.getPopTime(extraInfo) + invisibleTime);
         msgInner.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, PopMessageProcessor.genAckUniqueId(ackMsg));
         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         PutMessageResult putMessageResult = this.brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index 332843b9f..e10d72328 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -74,7 +74,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
                                         break;
                                     }
                                     POP_LOGGER.info("timeout , wakeUp Notification : {}", tmPopRequest);
-                                    wakeUp(tmPopRequest, false);
+                                    wakeUp(tmPopRequest);
                                     tmPopRequest = popQ.peek();
                                 } else {
                                     break;
@@ -111,28 +111,25 @@ public class NotificationProcessor implements NettyRequestProcessor {
     }
 
     public void notifyMessageArriving(final String topic, final int queueId) {
-        ArrayBlockingQueue<NotificationRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, -1));
-        if (remotingCommands != null) {
-            List<NotificationRequest> c = new ArrayList<>();
-            remotingCommands.drainTo(c);
-            for (NotificationRequest notificationRequest : c) {
-                POP_LOGGER.info("new msg arrive , wakeUp : {}", notificationRequest);
-                wakeUp(notificationRequest, true);
-
-            }
+        notifyMessageArrivingForQueue(topic, -1);
+        if (queueId > 0) {
+            notifyMessageArrivingForQueue(topic, queueId);
         }
-        remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, queueId));
+    }
+
+    public void notifyMessageArrivingForQueue(final String topic, final int queueId) {
+        ArrayBlockingQueue<NotificationRequest> remotingCommands = pollingMap.get(KeyBuilder.buildPollingNotificationKey(topic, queueId));
         if (remotingCommands != null) {
             List<NotificationRequest> c = new ArrayList<>();
             remotingCommands.drainTo(c);
             for (NotificationRequest notificationRequest : c) {
                 POP_LOGGER.info("new msg arrive , wakeUp : {}", notificationRequest);
-                wakeUp(notificationRequest, true);
+                wakeUp(notificationRequest);
             }
         }
     }
 
-    private void wakeUp(final NotificationRequest request, final boolean hasMsg) {
+    private void wakeUp(final NotificationRequest request) {
         if (request == null || !request.complete()) {
             return;
         }
@@ -142,11 +139,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
         Runnable run = () -> {
             try {
                 final RemotingCommand response;
-                if (hasMsg) {
-                    response = NotificationProcessor.this.responseNotification(request.getChannel(), true);
-                } else {
-                    response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
-                }
+                response = NotificationProcessor.this.processRequest(request.getChannel(), request.getRemotingCommand());
                 if (response != null) {
                     response.setOpaque(request.getRemotingCommand().getOpaque());
                     response.markResponseType();
@@ -165,14 +158,6 @@ public class NotificationProcessor implements NettyRequestProcessor {
         this.brokerController.getPullMessageExecutor().submit(new RequestTask(run, request.getChannel(), request.getRemotingCommand()));
     }
 
-    public RemotingCommand responseNotification(final Channel channel, boolean hasMsg) {
-        RemotingCommand response = RemotingCommand.createResponseCommand(NotificationResponseHeader.class);
-        final NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.readCustomHeader();
-        responseHeader.setHasMsg(hasMsg);
-        response.setCode(ResponseCode.SUCCESS);
-        return response;
-    }
-
     private RemotingCommand processRequest(final Channel channel, RemotingCommand request)
         throws RemotingCommandException {
         RemotingCommand response = RemotingCommand.createResponseCommand(NotificationResponseHeader.class);
@@ -237,31 +222,33 @@ public class NotificationProcessor implements NettyRequestProcessor {
                 }
             }
         }
-        if (!hasMsg && requestHeader.getQueueId() < 0) {
-            // read all queue
-            for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
-                int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
-                hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
-                if (hasMsg) {
-                    break;
-                }
-            }
-        } else {
-            int queueId = requestHeader.getQueueId();
-            hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
-        }
-        // if it doesn't have message, fetch retry again
-        if (!needRetry && !hasMsg) {
-            TopicConfig retryTopicConfig =
-                this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
-            if (retryTopicConfig != null) {
-                for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
-                    int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
-                    hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+        if (!hasMsg) {
+            if (requestHeader.getQueueId() < 0) {
+                // read all queue
+                for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
+                    int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
+                    hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
                     if (hasMsg) {
                         break;
                     }
                 }
+            } else {
+                int queueId = requestHeader.getQueueId();
+                hasMsg = hasMsgFromQueue(false, requestHeader, queueId);
+            }
+            // if it doesn't have message, fetch retry again
+            if (!needRetry && !hasMsg) {
+                TopicConfig retryTopicConfig =
+                    this.brokerController.getTopicConfigManager().selectTopicConfig(KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()));
+                if (retryTopicConfig != null) {
+                    for (int i = 0; i < retryTopicConfig.getReadQueueNums(); i++) {
+                        int queueId = (randomQ + i) % retryTopicConfig.getReadQueueNums();
+                        hasMsg = hasMsgFromQueue(true, requestHeader, queueId);
+                        if (hasMsg) {
+                            break;
+                        }
+                    }
+                }
             }
         }
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 2bea535f4..e2d51857e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -181,6 +181,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
                 // notify pop queue
                 notifySuccess = this.brokerController.getPopMessageProcessor().notifyMessageArriving(topic, group, queueId);
             }
+            this.brokerController.getNotificationProcessor().notifyMessageArriving(topic, queueId);
             if (this.brokerController.getBrokerConfig().isEnablePopLog()) {
                 POP_LOGGER.info("notify long polling request. topic:{}, group:{}, queueId:{}, success:{}",
                     topic, group, queueId, notifySuccess);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index 52b848b07..d0e9dbc36 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -140,6 +140,8 @@ public class PopReviveService extends ServiceThread {
                 popCheckPoint.getCId(),
                 -1
             );
+            brokerController.getNotificationProcessor().notifyMessageArriving(
+                KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
         }
         return true;
     }