You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/12/01 08:10:14 UTC
[rocketmq] 03/03: Add notification in MQClientAPIExt
This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 9b1b62515156cd649cc128c4bf629b9c8b181e8b
Author: zhouxiang <zh...@alibaba-inc.com>
AuthorDate: Thu Dec 1 15:24:27 2022 +0800
Add notification in MQClientAPIExt
---
.../proxy/service/mqclient/MQClientAPIExt.java | 29 ++++++++++++++++++++++
1 file changed, 29 insertions(+)
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
index 6c29213f8..f0c04bcc9 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/service/mqclient/MQClientAPIExt.java
@@ -65,6 +65,8 @@ import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.NotificationRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.NotificationResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.PopMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.PullMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.QueryConsumerOffsetRequestHeader;
@@ -590,6 +592,33 @@ public class MQClientAPIExt extends MQClientAPIImpl {
return future;
}
+ public CompletableFuture<Boolean> notification(String brokerAddr, NotificationRequestHeader requestHeader,
+ long timeoutMillis) {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFICATION, requestHeader);
+ try {
+ this.getRemotingClient().invokeAsync(brokerAddr, request, timeoutMillis, responseFuture -> {
+ RemotingCommand response = responseFuture.getResponseCommand();
+ if (response != null) {
+ if (response.getCode() == ResponseCode.SUCCESS) {
+ try {
+ NotificationResponseHeader responseHeader = (NotificationResponseHeader) response.decodeCommandCustomHeader(NotificationResponseHeader.class);
+ future.complete(responseHeader.isHasMsg());
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ }
+ future.completeExceptionally(new MQBrokerException(response.getCode(), response.getRemark()));
+ } else {
+ future.completeExceptionally(processNullResponseErr(responseFuture));
+ }
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
public CompletableFuture<RemotingCommand> invoke(String brokerAddr, RemotingCommand request, long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {