You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/01 08:10:14 UTC

[rocketmq] 03/03: Add notification in MQClientAPIExt

This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 9b1b62515156cd649cc128c4bf629b9c8b181e8b
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Dec 1 15:24:27 2022 +0800

    Add notification in MQClientAPIExt
---
 .../proxy/service/mqclient/MQClientAPIExt.java     | 29 ++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
index 6c29213f8..f0c04bcc9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
@@ -65,6 +65,8 @@ import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
@@ -590,6 +592,33 @@ public class MQClientAPIExt extends MQClientAPIImpl {
         return future;
     }
 
+    public CompletableFuture<Boolean> notification(String brokerAddr, NotificationRequestHeader requestHeader,
+        long timeoutMillis) {
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader);
+        try {
+            this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> {
+                RemotingCommand response = responseFuture.getResponseCommand();
+                if (response != null) {
+                    if (response.getCode() == ResponseCode.SUCCESS) {
+                        try {
+                            NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.decodeCommandCustomHeader(NotificationResponseHeader.class);
+                            future.complete(responseHeader.isHasMsg());
+                        } catch (Throwable t) {
+                            future.completeExceptionally(t);
+                        }
+                    }
+                    future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark()));
+                } else {
+                    future.completeExceptionally(processNullResponseErr(responseFuture));
+                }
+            });
+        } catch (Throwable t) {
+            future.completeExceptionally(t);
+        }
+        return future;
+    }
+
     public CompletableFuture<RemotingCommand> invoke(String brokerAddr, RemotingCommand request, long timeoutMillis) {
         CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
         try {