You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "HScarb (via GitHub)" <gi...@apache.org> on 2023/05/02 08:46:33 UTC

[GitHub] [rocketmq] HScarb opened a new pull request, #6682: [ISSUE #6681] fix: fix pop retry message notification

HScarb opened a new pull request, #6682:
URL: https://github.com/apache/rocketmq/pull/6682

   <!-- Please make sure the target branch is right. In most cases, the target branch should be `develop`. -->
   
   ### Which Issue(s) This PR Fixes
   
   
   <!-- Please ensure that the related issue has already been created, and [link this pull request to that issue using keywords](<https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword>) to ensure automatic closure. -->
   
   Fixes #6681 
   
   ### Brief Description
   
   <!-- Write a brief description for your pull request to help the maintainer understand the reasons behind your changes. -->
   
   After `ReputMessageService` dispatch a message from a pop retry topic, notify all consumers who subscribe to any queue of this topic.
   
   ### How Did You Test This Change?
   
   <!-- In order to ensure the code quality of Apache RocketMQ, we expect every pull request to have undergone thorough testing. -->
   
   1. Create a topic `TopicTest`.
   2. Start a pop push consumer using the example code `PopConsumer.java` to subscribe `TopicTest`, set popBatchSize to 1, and when receiving a message, print it and return `RECONSUME_LATER`.
   3. Publish a message to `TopicTest`.
   4. Wait and see the consumer's output
   
   The consumer consume retry message immediately
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] HScarb commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1535649327

   > Pop retry message has already been notified in PopReviveService#reviveRetry
   > 
   > ```
   > brokerController.getPopMessageProcessor().notifyMessageArriving(
   >     KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
   >     popCheckPoint.getCId(),
   >     -1
   > );
   > brokerController.getNotificationProcessor().notifyMessageArriving(
   >     KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
   > ```
   
   In `PopReviveService#reviveRetry`, it only notifies consumers who subscribe for all queues (queue ID is -1).
   
   But in the case which `popShareQueueNum >= consumerNum - 1`, the consumer will only subscribe to some of the queues.  https://github.com/apache/rocketmq/blob/6f6032e9eb812d42a67bea3cdb02cf4ef6e7f6c3/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java#L236-L270
   In this case, pollingMap only contains polling requests of specific queues, and notifying -1 queueID will not wake up the polling requests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] codecov-commenter commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1579114680

   ## [Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#6682](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b823d35) into [develop](https://app.codecov.io/gh/apache/rocketmq/commit/bee5077bcb77411f103aafb2220184f59db2c95e?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (bee5077) will **decrease** coverage by `0.03%`.
   > The diff coverage is `7.40%`.
   
   ```diff
   @@              Coverage Diff              @@
   ##             develop    #6682      +/-   ##
   =============================================
   - Coverage      42.65%   42.62%   -0.03%     
   + Complexity      9093     9086       -7     
   =============================================
     Files           1121     1121              
     Lines          79775    79795      +20     
     Branches       10409    10415       +6     
   =============================================
   - Hits           34027    34015      -12     
   - Misses         41462    41500      +38     
   + Partials        4286     4280       -6     
   ```
   
   
   | [Impacted Files](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [...rocketmq/broker/processor/PopMessageProcessor.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcE1lc3NhZ2VQcm9jZXNzb3IuamF2YQ==) | `43.82% <0.00%> (-0.71%)` | :arrow_down: |
   | [...he/rocketmq/broker/processor/PopReviveService.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcFJldml2ZVNlcnZpY2UuamF2YQ==) | `36.29% <0.00%> (-0.18%)` | :arrow_down: |
   | [...in/java/org/apache/rocketmq/common/KeyBuilder.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vS2V5QnVpbGRlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...etmq/broker/longpolling/PopLongPollingService.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvbG9uZ3BvbGxpbmcvUG9wTG9uZ1BvbGxpbmdTZXJ2aWNlLmphdmE=) | `17.15% <8.69%> (-2.53%)` | :arrow_down: |
   
   ... and [31 files with indirect coverage changes](https://app.codecov.io/gh/apache/rocketmq/pull/6682/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] HScarb commented on a diff in pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on code in PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#discussion_r1186871361


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java:
##########
@@ -734,15 +767,13 @@ private int polling(final Channel channel, RemotingCommand remotingCommand,
         if (requestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
             return NOT_POLLING;
         }
-        ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
-        if (cids == null) {
-            cids = new ConcurrentHashMap<>();
-            ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
-            if (old != null) {
-                cids = old;
-            }
-        }
-        cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+        topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>())
+            .putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+        final String popRetryTopic
+            = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+        topicCidMap.computeIfAbsent(popRetryTopic, k -> new ConcurrentHashMap<>())

Review Comment:
   But I didn't notice `notifyMessageArriving` is called in `PopReviveService#reviveRetry`. 
   It looks like it would be more appropriate to call `notifyRetryMessageArriving` in `PopReviveService#reviveRetry`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] xdkxlk commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "xdkxlk (via GitHub)" <gi...@apache.org>.
xdkxlk commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1534009981

   Pop retry message has already been notified in PopReviveService#reviveRetry
   ```
   brokerController.getPopMessageProcessor().notifyMessageArriving(
       KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
       popCheckPoint.getCId(),
       -1
   );
   brokerController.getNotificationProcessor().notifyMessageArriving(
       KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] xdkxlk commented on a diff in pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "xdkxlk (via GitHub)" <gi...@apache.org>.
xdkxlk commented on code in PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#discussion_r1186599900


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java:
##########
@@ -734,15 +767,13 @@ private int polling(final Channel channel, RemotingCommand remotingCommand,
         if (requestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
             return NOT_POLLING;
         }
-        ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
-        if (cids == null) {
-            cids = new ConcurrentHashMap<>();
-            ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
-            if (old != null) {
-                cids = old;
-            }
-        }
-        cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+        topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>())
+            .putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+        final String popRetryTopic
+            = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+        topicCidMap.computeIfAbsent(popRetryTopic, k -> new ConcurrentHashMap<>())

Review Comment:
   Why we need put popRetryTopic into topicCidMap? In notifyRetryMessageArriving, the topic has been changed to normal topic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] HScarb commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1579758182

   @xdkxlk PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] xdkxlk commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "xdkxlk (via GitHub)" <gi...@apache.org>.
xdkxlk commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1537000449

   > > Pop retry message has already been notified in PopReviveService#reviveRetry
   > > ```
   > > brokerController.getPopMessageProcessor().notifyMessageArriving(
   > >     KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
   > >     popCheckPoint.getCId(),
   > >     -1
   > > );
   > > brokerController.getNotificationProcessor().notifyMessageArriving(
   > >     KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
   > > ```
   > 
   > In `PopReviveService#reviveRetry`, it only notifies consumers who subscribe for all queues (queue ID is -1).
   > 
   > But in the case which `popShareQueueNum >= consumerNum - 1`, the consumer will only subscribe to some of the queues.
   > 
   > https://github.com/apache/rocketmq/blob/6f6032e9eb812d42a67bea3cdb02cf4ef6e7f6c3/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java#L236-L270
   > 
   > 
   > In this case, pollingMap only contains polling requests of specific queues, and notifying -1 queueID will not wake up the polling requests.
   > ![image](https://user-images.githubusercontent.com/10664298/236371473-69353f1a-a1bc-4e2d-aefa-f56414346d6d.png)
   > The polling requests will finally be waked up until they reach the expiration time.
   
   Got it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq] HScarb commented on a diff in pull request #6682: [ISSUE #6681] fix: fix pop retry message notification

Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on code in PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#discussion_r1186870849


##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java:
##########
@@ -734,15 +767,13 @@ private int polling(final Channel channel, RemotingCommand remotingCommand,
         if (requestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
             return NOT_POLLING;
         }
-        ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
-        if (cids == null) {
-            cids = new ConcurrentHashMap<>();
-            ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
-            if (old != null) {
-                cids = old;
-            }
-        }
-        cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+        topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>())
+            .putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+        final String popRetryTopic
+            = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+        topicCidMap.computeIfAbsent(popRetryTopic, k -> new ConcurrentHashMap<>())

Review Comment:
   Because `notifyMessageArriving(final String toic, final int queueId)` is called by ReputMessageService, the topic won't be changed to normal topic.
   ![image](https://user-images.githubusercontent.com/10664298/236685934-e797e1ae-bbf9-4e6e-9d0a-c6d377d1f43a.png)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org