You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/01 08:10:12 UTC

[rocketmq] 01/03: Add popMessage AddressableMessageQueue interface

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit a4e9d19d2d81066d45e57d0115124bd52c7bc55b
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Dec 1 15:22:44 2022 +0800

    Add popMessage AddressableMessageQueue interface
---
 .../proxy/processor/ConsumerProcessor.java         | 40 +++++++++++++++++-----
 1 file changed, 31 insertions(+), 9 deletions(-)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
index eb6f8ea2d..37c2e54d6 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java
@@ -89,7 +89,29 @@ public class ConsumerProcessor extends AbstractProcessor {
             if (messageQueue == null) {
                 throw new ProxyException(ProxyExceptionCode.FORBIDDEN, "no readable queue");
             }
+            return popMessage(ctx, messageQueue, consumerGroup, topic, maxMsgNums, invisibleTime, pollTime, initMode, subscriptionData, fifo, popMessageResultFilter, timeoutMillis);
+        }  catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
 
+    public CompletableFuture<PopResult> popMessage(
+        ProxyContext ctx,
+        AddressableMessageQueue messageQueue,
+        String consumerGroup,
+        String topic,
+        int maxMsgNums,
+        long invisibleTime,
+        long pollTime,
+        int initMode,
+        SubscriptionData subscriptionData,
+        boolean fifo,
+        PopMessageResultFilter popMessageResultFilter,
+        long timeoutMillis
+    ) {
+        CompletableFuture<PopResult> future = new CompletableFuture<>();
+        try {
             if (maxMsgNums > ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST) {
                 log.warn("change maxNums from {} to {} for pop request, with info: topic:{}, group:{}",
                     maxMsgNums, ProxyUtils.MAX_MSG_NUMS_FOR_POP_REQUEST, topic, consumerGroup);
@@ -109,10 +131,10 @@ public class ConsumerProcessor extends AbstractProcessor {
             requestHeader.setOrder(fifo);
 
             future = this.serviceManager.getMessageService().popMessage(
-                ctx,
-                messageQueue,
-                requestHeader,
-                timeoutMillis)
+                    ctx,
+                    messageQueue,
+                    requestHeader,
+                    timeoutMillis)
                 .thenApplyAsync(popResult -> {
                     if (PopStatus.FOUND.equals(popResult.getPopStatus()) &&
                         popResult.getMsgFoundList() != null &&
@@ -218,11 +240,11 @@ public class ConsumerProcessor extends AbstractProcessor {
             long commitLogOffset = handle.getCommitLogOffset();
 
             future = this.serviceManager.getMessageService().changeInvisibleTime(
-                ctx,
-                handle,
-                messageId,
-                changeInvisibleTimeRequestHeader,
-                timeoutMillis)
+                    ctx,
+                    handle,
+                    messageId,
+                    changeInvisibleTimeRequestHeader,
+                    timeoutMillis)
                 .thenApplyAsync(ackResult -> {
                     if (StringUtils.isNotBlank(ackResult.getExtraInfo())) {
                         AckResult result = new AckResult();