You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/12/21 15:18:50 UTC
[GitHub] [rocketmq] ShadowySpirits opened a new pull request, #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
ShadowySpirits opened a new pull request, #5755:
URL: https://github.com/apache/rocketmq/pull/5755
link to #5754
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] lizhimins commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
lizhimins commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055124280
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/DefaultPullMessageResultHandler.java:
##########
@@ -80,12 +84,31 @@ public RemotingCommand handle(final GetMessageResult getMessageResult,
final boolean brokerAllowSuspend,
final MessageFilter messageFilter,
RemotingCommand response) {
-
PullMessageProcessor processor = brokerController.getPullMessageProcessor();
- processor.updateBroadcastPulledOffset(requestHeader.getTopic(), requestHeader.getConsumerGroup(),
- requestHeader.getQueueId(), requestHeader, channel, response, getMessageResult.getNextBeginOffset());
+ final String clientAddress = RemotingHelper.parseChannelRemoteAddr(channel);
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+ processor.composeResponseHeader(requestHeader, getMessageResult, topicConfig.getTopicSysFlag(),
+ subscriptionGroupConfig, response, clientAddress);
+ try {
+ processor.executeConsumeMessageHookBefore(request, requestHeader, getMessageResult, brokerAllowSuspend, response.getCode());
+ } catch (AbortProcessException e) {
+ response.setCode(e.getResponseCode());
+ response.setRemark(e.getErrorMessage());
+ return response;
+ }
+ //rewrite the response for the
Review Comment:
注释不全
##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, int queueId, String brok
}
}
+ public CompletableFuture<MessageExt> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
Review Comment:
应该把同步接口也用异步的去实现
##########
broker/src/main/java/org/apache/rocketmq/broker/out/BrokerOuterAPI.java:
##########
@@ -1269,6 +1269,44 @@ public PullResult pullMessageFromSpecificBroker(String brokerName, String broker
return pullResultExt;
}
+ public CompletableFuture<PullResult> pullMessageFromSpecificBrokerAsync(String brokerName, String brokerAddr,
+ String consumerGroup, String topic, int queueId, long offset,
+ int maxNums,
+ long timeoutMillis) throws RemotingException, InterruptedException {
Review Comment:
codestyle is strange
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055334069
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java:
##########
@@ -122,7 +125,8 @@ private void reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt) thr
}
if (putMessageResult.getAppendMessageResult() == null ||
putMessageResult.getAppendMessageResult().getStatus() != AppendMessageStatus.PUT_OK) {
- throw new Exception("reviveQueueId=" + queueId + ", revive error, msg is: " + msgInner);
+ POP_LOGGER.error("reviveQueueId=" + queueId + ", revive error, msg is: " + msgInner);
Review Comment:
Use `{}` to print the argument.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055325703
##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, int queueId, String brok
}
}
+ public CompletableFuture<MessageExt> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
+ MessageStore messageStore = brokerController.getMessageStoreByBrokerName(brokerName);
+ if (messageStore != null) {
+ return messageStore.getMessageAsync(innerConsumerGroupName, topic, queueId, offset, 1, null)
+ .thenApply(result -> {
+ if (result == null) {
+ LOG.warn("getMessageResult is null , innerConsumerGroupName {}, topic {}, offset {}, queueId {}", innerConsumerGroupName, topic, offset, queueId);
+ return null;
+ }
+ List<MessageExt> list = decodeMsgList(result, deCompressBody);
+ if (list == null || list.isEmpty()) {
+ LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result);
+ return null;
+ } else {
+ return list.get(0);
Review Comment:
redundant else, maybe we could refer to [Return Early Pattern](https://medium.com/swlh/return-early-pattern-3d18a41bba8).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] xdkxlk commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
xdkxlk commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055140306
##########
broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java:
##########
@@ -451,30 +465,79 @@ protected void mergeAndRevive(ConsumeReviveObj consumeReviveObj) throws Throwabl
}
this.brokerController.getConsumerOffsetManager().commitOffset(PopAckConstants.LOCAL_HOST, PopAckConstants.REVIVE_GROUP, reviveTopic, queueId, newOffset);
}
+ reviveOffset = newOffset;
consumeReviveObj.newOffset = newOffset;
}
- private void reviveMsgFromCk(PopCheckPoint popCheckPoint) throws Throwable {
+ private void reviveMsgFromCk(PopCheckPoint popCheckPoint) {
+ if (!shouldRunPopRevive) {
+ POP_LOGGER.info("slave skip retry , revive topic={}, reviveQueueId={}", reviveTopic, queueId);
+ return;
+ }
+ inflightReviveRequestMap.put(popCheckPoint, new Pair<>(System.currentTimeMillis(), false));
+ List<CompletableFuture<Pair<Long, Boolean>>> futureList = new ArrayList<>(popCheckPoint.getNum());
for (int j = 0; j < popCheckPoint.getNum(); j++) {
if (DataConverter.getBit(popCheckPoint.getBitMap(), j)) {
continue;
}
// retry msg
long msgOffset = popCheckPoint.ackOffsetByIndex((byte) j);
- MessageExt messageExt = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName());
- if (messageExt == null) {
- POP_LOGGER.warn("reviveQueueId={},can not get biz msg topic is {}, offset is {} , then continue ",
- queueId, popCheckPoint.getTopic(), msgOffset);
- continue;
- }
- //skip ck from last epoch
- if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
- POP_LOGGER.warn("reviveQueueId={},skip ck from last epoch {}", queueId, popCheckPoint);
- continue;
- }
- reviveRetry(popCheckPoint, messageExt);
+ CompletableFuture<Pair<Long, Boolean>> future = getBizMessage(popCheckPoint.getTopic(), msgOffset, popCheckPoint.getQueueId(), popCheckPoint.getBrokerName())
+ .thenApply(messageExt -> {
+ if (messageExt == null) {
+ POP_LOGGER.warn("reviveQueueId={}, can not get biz msg topic is {}, offset is {}, then continue",
+ queueId, popCheckPoint.getTopic(), msgOffset);
+ return new Pair<>(msgOffset, true);
+ }
+ //skip ck from last epoch
+ if (popCheckPoint.getPopTime() < messageExt.getStoreTimestamp()) {
+ POP_LOGGER.warn("reviveQueueId={}, skip ck from last epoch {}", queueId, popCheckPoint);
+ return new Pair<>(msgOffset, false);
Review Comment:
In my opinion, there should be `new Pair<>(msgOffset, true)` to skip ck from last epoch
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055325703
##########
broker/src/main/java/org/apache/rocketmq/broker/failover/EscapeBridge.java:
##########
@@ -283,6 +282,28 @@ public MessageExt getMessage(String topic, long offset, int queueId, String brok
}
}
+ public CompletableFuture<MessageExt> getMessageAsync(String topic, long offset, int queueId, String brokerName, boolean deCompressBody) {
+ MessageStore messageStore = brokerController.getMessageStoreByBrokerName(brokerName);
+ if (messageStore != null) {
+ return messageStore.getMessageAsync(innerConsumerGroupName, topic, queueId, offset, 1, null)
+ .thenApply(result -> {
+ if (result == null) {
+ LOG.warn("getMessageResult is null , innerConsumerGroupName {}, topic {}, offset {}, queueId {}", innerConsumerGroupName, topic, offset, queueId);
+ return null;
+ }
+ List<MessageExt> list = decodeMsgList(result, deCompressBody);
+ if (list == null || list.isEmpty()) {
+ LOG.warn("Can not get msg , topic {}, offset {}, queueId {}, result is {}", topic, offset, queueId, result);
+ return null;
+ } else {
+ return list.get(0);
Review Comment:
redundant else
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] aaron-ai commented on a diff in pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
aaron-ai commented on code in PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#discussion_r1055335199
##########
store/src/main/java/org/apache/rocketmq/store/MessageStore.java:
##########
@@ -137,6 +140,9 @@ GetMessageResult getMessage(final String group, final String topic, final int qu
GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset, final int maxMsgNums, final int maxTotalMsgSize, final MessageFilter messageFilter);
+ CompletableFuture<GetMessageResult> getMessageAsync(final String group, final String topic, final int queueId,
Review Comment:
Add comments for interface.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] aaron-ai merged pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
aaron-ai merged PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [rocketmq] codecov-commenter commented on pull request #5755: [ISSUE #5754] [RIP-57] Add asynchronous interfaces to MessageStore
Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #5755:
URL: https://github.com/apache/rocketmq/pull/5755#issuecomment-1363577441
# [Codecov](https://codecov.io/gh/apache/rocketmq/pull/5755?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
> Merging [#5755](https://codecov.io/gh/apache/rocketmq/pull/5755?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (979a4af) into [develop](https://codecov.io/gh/apache/rocketmq/commit/646d04f42524636ad09364af2a67fa8ab8b6b8fa?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (646d04f) will **decrease** coverage by `0.00%`.
> The diff coverage is `39.24%`.
```diff
@@ Coverage Diff @@
## develop #5755 +/- ##
=============================================
- Coverage 42.45% 42.45% -0.01%
- Complexity 8215 8239 +24
=============================================
Files 1060 1060
Lines 73193 73316 +123
Branches 9598 9597 -1
=============================================
+ Hits 31071 31123 +52
- Misses 38221 38282 +61
- Partials 3901 3911 +10
```
| [Impacted Files](https://codecov.io/gh/apache/rocketmq/pull/5755?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
|---|---|---|
| [...org/apache/rocketmq/broker/out/BrokerOuterAPI.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvb3V0L0Jyb2tlck91dGVyQVBJLmphdmE=) | `16.55% <0.00%> (-0.82%)` | :arrow_down: |
| [...org/apache/rocketmq/store/DefaultMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL0RlZmF1bHRNZXNzYWdlU3RvcmUuamF2YQ==) | `52.56% <0.00%> (-0.21%)` | :arrow_down: |
| [...n/java/org/apache/rocketmq/store/MessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL01lc3NhZ2VTdG9yZS5qYXZh) | `0.00% <ø> (ø)` | |
| [...ketmq/store/plugin/AbstractPluginMessageStore.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3BsdWdpbi9BYnN0cmFjdFBsdWdpbk1lc3NhZ2VTdG9yZS5qYXZh) | `0.00% <0.00%> (ø)` | |
| [...a/org/apache/rocketmq/store/pop/PopCheckPoint.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c3RvcmUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL3JvY2tldG1xL3N0b3JlL3BvcC9Qb3BDaGVja1BvaW50LmphdmE=) | `0.00% <0.00%> (ø)` | |
| [...he/rocketmq/broker/processor/PopReviveService.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcFJldml2ZVNlcnZpY2UuamF2YQ==) | `40.05% <29.85%> (-0.26%)` | :arrow_down: |
| [...rocketmq/broker/processor/PopMessageProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1BvcE1lc3NhZ2VQcm9jZXNzb3IuamF2YQ==) | `36.60% <43.39%> (-0.20%)` | :arrow_down: |
| [...ocketmq/broker/processor/PullMessageProcessor.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL1B1bGxNZXNzYWdlUHJvY2Vzc29yLmphdmE=) | `28.75% <48.71%> (+1.07%)` | :arrow_up: |
| [.../apache/rocketmq/broker/failover/EscapeBridge.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvZmFpbG92ZXIvRXNjYXBlQnJpZGdlLmphdmE=) | `41.66% <50.00%> (+8.13%)` | :arrow_up: |
| [...ker/processor/DefaultPullMessageResultHandler.java](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-YnJva2VyL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9yb2NrZXRtcS9icm9rZXIvcHJvY2Vzc29yL0RlZmF1bHRQdWxsTWVzc2FnZVJlc3VsdEhhbmRsZXIuamF2YQ==) | `38.88% <55.55%> (+1.67%)` | :arrow_up: |
| ... and [30 more](https://codecov.io/gh/apache/rocketmq/pull/5755/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
:mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@rocketmq.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org