You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by "HScarb (via GitHub)" <gi...@apache.org> on 2023/05/02 08:46:33 UTC
[GitHub] [rocketmq] HScarb opened a new pull request, #6682: [ISSUE #6681] fix: fix pop retry message notification
HScarb opened a new pull request, #6682:
URL: https://github.com/apache/rocketmq/pull/6682
<!-- Please make sure the target branch is right. In most cases, the target branch should be `develop`. -->
### Which Issue(s) This PR Fixes
<!-- Please ensure that the related issue has already been created, and [link this pull request to that issue using keywords](<https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword>) to ensure automatic closure. -->
Fixes #6681
### Brief Description
<!-- Write a brief description for your pull request to help the maintainer understand the reasons behind your changes. -->
After `ReputMessageService` dispatch a message from a pop retry topic, notify all consumers who subscribe to any queue of this topic.
### How Did You Test This Change?
<!-- In order to ensure the code quality of Apache RocketMQ, we expect every pull request to have undergone thorough testing. -->
1. Create a topic `TopicTest`.
2. Start a pop push consumer using the example code `PopConsumer.java` to subscribe `TopicTest`, set popBatchSize to 1, and when receiving a message, print it and return `RECONSUME_LATER`.
3. Publish a message to `TopicTest`.
4. Wait and see the consumer's output
The consumer consume retry message immediately
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] HScarb commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1535649327
> Pop retry message has already been notified in PopReviveService#reviveRetry
>
> ```
> brokerController.getPopMessageProcessor().notifyMessageArriving(
> KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
> popCheckPoint.getCId(),
> -1
> );
> brokerController.getNotificationProcessor().notifyMessageArriving(
> KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
> ```
In `PopReviveService#reviveRetry`, it only notifies consumers who subscribe for all queues (queue ID is -1).
But in the case which `popShareQueueNum >= consumerNum - 1`, the consumer will only subscribe to some of the queues. https://github.com/apache/rocketmq/blob/6f6032e9eb812d42a67bea3cdb02cf4ef6e7f6c3/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java#L236-L270
In this case, pollingMap only contains polling requests of specific queues, and notifying -1 queueID will not wake up the polling requests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1579114680
## [Codecov](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
> Merging [#6682](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (b823d35) into [develop](https://app.codecov.io/gh/apache/rocketmq/commit/bee5077bcb77411f103aafb2220184f59db2c95e?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (bee5077) will **decrease** coverage by `0.03%`.
> The diff coverage is `7.40%`.
```diff
@@ Coverage Diff @@
## develop #6682 +/- ##
=============================================
- Coverage 42.65% 42.62% -0.03%
+ Complexity 9093 9086 -7
=============================================
Files 1121 1121
Lines 79775 79795 +20
Branches 10409 10415 +6
=============================================
- Hits 34027 34015 -12
- Misses 41462 41500 +38
+ Partials 4286 4280 -6
```
| [Impacted Files](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
|---|---|---|
| [...rocketmq/broker/processor/PopMessageProcessor.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcE1lc3NhZ2VQcm9jZXNzb3IuamF2YQ==) | `43.82% <0.00%> (-0.71%)` | :arrow_down: |
| [...he/rocketmq/broker/processor/PopReviveService.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcFJldml2ZVNlcnZpY2UuamF2YQ==) | `36.29% <0.00%> (-0.18%)` | :arrow_down: |
| [...in/java/org/apache/rocketmq/common/KeyBuilder.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9jb21tb24vS2V5QnVpbGRlci5qYXZh) | `0.00% <0.00%> (ø)` | |
| [...etmq/broker/longpolling/PopLongPollingService.java](https://app.codecov.io/gh/apache/rocketmq/pull/6682?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvbG9uZ3BvbGxpbmcvUG9wTG9uZ1BvbGxpbmdTZXJ2aWNlLmphdmE=) | `17.15% <8.69%> (-2.53%)` | :arrow_down: |
... and [31 files with indirect coverage changes](https://app.codecov.io/gh/apache/rocketmq/pull/6682/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
:mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] HScarb commented on a diff in pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on code in PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#discussion_r1186871361
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java:
##########
@@ -734,15 +767,13 @@ private int polling(final Channel channel, RemotingCommand remotingCommand,
if (requestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
return NOT_POLLING;
}
- ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
- if (cids == null) {
- cids = new ConcurrentHashMap<>();
- ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
- if (old != null) {
- cids = old;
- }
- }
- cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+ topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>())
+ .putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+ final String popRetryTopic
+ = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+ topicCidMap.computeIfAbsent(popRetryTopic, k -> new ConcurrentHashMap<>())
Review Comment:
But I didn't notice `notifyMessageArriving` is called in `PopReviveService#reviveRetry`.
It looks like it would be more appropriate to call `notifyRetryMessageArriving` in `PopReviveService#reviveRetry`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] xdkxlk commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "xdkxlk (via GitHub)" <gi...@apache.org>.
xdkxlk commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1534009981
Pop retry message has already been notified in PopReviveService#reviveRetry
```
brokerController.getPopMessageProcessor().notifyMessageArriving(
KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
popCheckPoint.getCId(),
-1
);
brokerController.getNotificationProcessor().notifyMessageArriving(
KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] xdkxlk commented on a diff in pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "xdkxlk (via GitHub)" <gi...@apache.org>.
xdkxlk commented on code in PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#discussion_r1186599900
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java:
##########
@@ -734,15 +767,13 @@ private int polling(final Channel channel, RemotingCommand remotingCommand,
if (requestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
return NOT_POLLING;
}
- ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
- if (cids == null) {
- cids = new ConcurrentHashMap<>();
- ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
- if (old != null) {
- cids = old;
- }
- }
- cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+ topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>())
+ .putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+ final String popRetryTopic
+ = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+ topicCidMap.computeIfAbsent(popRetryTopic, k -> new ConcurrentHashMap<>())
Review Comment:
Why we need put popRetryTopic into topicCidMap? In notifyRetryMessageArriving, the topic has been changed to normal topic.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] HScarb commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1579758182
@xdkxlk PTAL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] xdkxlk commented on pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "xdkxlk (via GitHub)" <gi...@apache.org>.
xdkxlk commented on PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#issuecomment-1537000449
> > Pop retry message has already been notified in PopReviveService#reviveRetry
> > ```
> > brokerController.getPopMessageProcessor().notifyMessageArriving(
> > KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()),
> > popCheckPoint.getCId(),
> > -1
> > );
> > brokerController.getNotificationProcessor().notifyMessageArriving(
> > KeyBuilder.parseNormalTopic(popCheckPoint.getTopic(), popCheckPoint.getCId()), -1);
> > ```
>
> In `PopReviveService#reviveRetry`, it only notifies consumers who subscribe for all queues (queue ID is -1).
>
> But in the case which `popShareQueueNum >= consumerNum - 1`, the consumer will only subscribe to some of the queues.
>
> https://github.com/apache/rocketmq/blob/6f6032e9eb812d42a67bea3cdb02cf4ef6e7f6c3/broker/src/main/java/org/apache/rocketmq/broker/processor/QueryAssignmentProcessor.java#L236-L270
>
>
> In this case, pollingMap only contains polling requests of specific queues, and notifying -1 queueID will not wake up the polling requests.
> ![image](https://user-images.githubusercontent.com/10664298/236371473-69353f1a-a1bc-4e2d-aefa-f56414346d6d.png)
> The polling requests will finally be waked up until they reach the expiration time.
Got it
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] HScarb commented on a diff in pull request #6682: [ISSUE #6681] fix: fix pop retry message notification
Posted by "HScarb (via GitHub)" <gi...@apache.org>.
HScarb commented on code in PR #6682:
URL: https://github.com/apache/rocketmq/pull/6682#discussion_r1186870849
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java:
##########
@@ -734,15 +767,13 @@ private int polling(final Channel channel, RemotingCommand remotingCommand,
if (requestHeader.getPollTime() <= 0 || this.popLongPollingService.isStopped()) {
return NOT_POLLING;
}
- ConcurrentHashMap<String, Byte> cids = topicCidMap.get(requestHeader.getTopic());
- if (cids == null) {
- cids = new ConcurrentHashMap<>();
- ConcurrentHashMap<String, Byte> old = topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
- if (old != null) {
- cids = old;
- }
- }
- cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+ topicCidMap.computeIfAbsent(requestHeader.getTopic(), k -> new ConcurrentHashMap<>())
+ .putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
+ final String popRetryTopic
+ = KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup());
+ topicCidMap.computeIfAbsent(popRetryTopic, k -> new ConcurrentHashMap<>())
Review Comment:
Because `notifyMessageArriving(final String toic, final int queueId)` is called by ReputMessageService, the topic won't be changed to normal topic.
![image](https://user-images.githubusercontent.com/10664298/236685934-e797e1ae-bbf9-4e6e-9d0a-c6d377d1f43a.png)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org