You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/01 08:10:12 UTC
[rocketmq] 01/03: Add popMessage AddressableMessageQueue interface
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit a4e9d19d2d81066d45e57d0115124bd52c7bc55b
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Dec 1 15:22:44 2022 +0800
Add popMessage AddressableMessageQueue interface
---
.../proxy/processor/ConsumerProcessor.java | 40 +++++++++++++++++-----
1 file changed, 31 insertions(+), 9 deletions(-)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index eb6f8ea2d..37c2e54d6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -89,7 +89,29 @@ public class ConsumerProcessor extends AbstractProcessor {
if (messageQueue == null) {
throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no readable queue");
}
+ return popMessage(ctx, messageQueue, consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, initMode, subscriptionData, fifo, popMessageResultFilter, timeoutMillis);
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+ public CompletableFuture<PopResult> popMessage(
+ ProxyContext ctx,
+ AddressableMessageQueue messageQueue,
+ String consumerGroup,
+ String topic,
+ int maxMsgNums,
+ long invisibleTime,
+ long pollTime,
+ int initMode,
+ SubscriptionData subscriptionData,
+ boolean fifo,
+ PopMessageResultFilter popMessageResultFilter,
+ long timeoutMillis
+ ) {
+ CompletableFuture<PopResult> future = new CompletableFuture<>();
+ try {
if (maxMsgNums > ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST) {
log.warn("change maxNums from {} to {} for pop request, with info: topic:{}, group:{}",
maxMsgNums, ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST, topic, consumerGroup);
@@ -109,10 +131,10 @@ public class ConsumerProcessor extends AbstractProcessor {
requestHeader.setOrder(fifo);
future = this.serviceManager.getMessageService().popMessage(
- ctx,
- messageQueue,
- requestHeader,
- timeoutMillis)
+ ctx,
+ messageQueue,
+ requestHeader,
+ timeoutMillis)
.thenApplyAsync(popResult -> {
if (PopStatus.FOUND.equals(popResult.getPopStatus()) &&
popResult.getMsgFoundList() != null &&
@@ -218,11 +240,11 @@ public class ConsumerProcessor extends AbstractProcessor {
long commitLogOffset = handle.getCommitLogOffset();
future = this.serviceManager.getMessageService().changeInvisibleTime(
- ctx,
- handle,
- messageId,
- changeInvisibleTimeRequestHeader,
- timeoutMillis)
+ ctx,
+ handle,
+ messageId,
+ changeInvisibleTimeRequestHeader,
+ timeoutMillis)
.thenApplyAsync(ackResult -> {
if (StringUtils.isNotBlank(ackResult.getExtraInfo())) {
AckResult result = new AckResult();