You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/14 11:20:04 UTC

[rocketmq-clients] branch master updated: Add requestId into client exception (#53)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new a49efa2  Add requestId into client exception (#53)
a49efa2 is described below

commit a49efa2ad4fb8ed73b257563e32d21698d68d13f
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Jul 14 19:19:59 2022 +0800

    Add requestId into client exception (#53)
---
 .../rocketmq/client/apis/ClientException.java      |  6 ++++
 .../client/java/exception/BadRequestException.java |  4 +--
 .../client/java/exception/ForbiddenException.java  |  4 +--
 .../java/exception/InternalErrorException.java     |  8 ++---
 .../client/java/exception/NotFoundException.java   |  8 ++---
 .../java/exception/PayloadTooLargeException.java   |  4 +--
 .../java/exception/ProxyTimeoutException.java      |  4 +--
 .../RequestHeaderFieldsTooLargeException.java      |  4 +--
 .../java/exception/TooManyRequestsException.java   |  4 +--
 .../java/exception/UnauthorizedException.java      |  4 +--
 .../java/exception/UnsupportedException.java       |  4 +--
 .../rocketmq/client/java/impl/ClientImpl.java      | 17 +++++----
 .../java/impl/consumer/FifoConsumeService.java     | 15 +-------
 .../java/impl/consumer/ProcessQueueImpl.java       | 30 +++++++++++-----
 .../java/impl/consumer/PushConsumerImpl.java       |  4 +--
 .../java/impl/consumer/ReceiveMessageResult.java   | 22 +++++-------
 .../java/impl/consumer/SimpleConsumerImpl.java     | 42 ++++++++++++----------
 .../client/java/impl/producer/ProducerImpl.java    |  6 ++--
 .../client/java/impl/producer/SendReceiptImpl.java | 33 +++++++++--------
 .../client/java/route/TopicRouteDataResult.java    | 29 +++++++++------
 .../apache/rocketmq/client/java/tool/TestBase.java |  3 +-
 21 files changed, 135 insertions(+), 120 deletions(-)

diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
index e622ce1..5e518de 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
@@ -55,6 +55,12 @@ public class ClientException extends Exception {
         putMetadata(RESPONSE_CODE_KEY, String.valueOf(responseCode));
     }
 
+    public ClientException(int responseCode, String requestId, String message) {
+        this(responseCode, message);
+        putMetadata(RESPONSE_CODE_KEY, String.valueOf(responseCode));
+        putMetadata(REQUEST_ID_KEY, requestId);
+    }
+
     @SuppressWarnings("SameParameterValue")
     protected void putMetadata(String key, String value) {
         context.put(key, value);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
index 386c916..3d32bad 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * Generic exception for bad request, indicating that the required fields of headers are missing.
  */
 public class BadRequestException extends ClientException {
-    public BadRequestException(int responseCode, String message) {
-        super(responseCode, message);
+    public BadRequestException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
index 49eb280..6f115f3 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * Generic exception for the case that user does not have permission to access/operation the resource.
  */
 public class ForbiddenException extends ClientException {
-    public ForbiddenException(int responseCode, String message) {
-        super(responseCode, message);
+    public ForbiddenException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index f42d369..baba366 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -24,11 +24,11 @@ import org.apache.rocketmq.client.apis.ClientException;
  * fulfilling the request.
  */
 public class InternalErrorException extends ClientException {
-    public InternalErrorException(int responseCode, String message) {
-        super(responseCode, message);
-    }
-
     public InternalErrorException(Throwable cause) {
         super(cause);
     }
+
+    public InternalErrorException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
+    }
 }
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
index 6ca06ca..fef114b 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
@@ -23,10 +23,6 @@ import org.apache.rocketmq.client.apis.ClientException;
  * Generic exception for resource not found.
  */
 public class NotFoundException extends ClientException {
-    public NotFoundException(int responseCode, String message) {
-        super(responseCode, message);
-    }
-
     public NotFoundException(String message) {
         super(message);
     }
@@ -34,4 +30,8 @@ public class NotFoundException extends ClientException {
     public NotFoundException(Throwable t) {
         super(t);
     }
+
+    public NotFoundException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
+    }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
index 6dd8358..f514697 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * Generic exception represents that the request entity is larger than the limits defined by the server.
  */
 public class PayloadTooLargeException extends ClientException {
-    public PayloadTooLargeException(int responseCode, String message) {
-        super(responseCode, message);
+    public PayloadTooLargeException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
index 2a8abbf..6212f3a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * in time from its upstream servers.
  */
 public class ProxyTimeoutException extends ClientException {
-    public ProxyTimeoutException(int responseCode, String message) {
-        super(responseCode, message);
+    public ProxyTimeoutException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
index e2813f1..4b396e6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * too large. The request may be resubmitted after reducing the size of the request header fields.
  */
 public class RequestHeaderFieldsTooLargeException extends ClientException {
-    public RequestHeaderFieldsTooLargeException(int responseCode, String message) {
-        super(responseCode, message);
+    public RequestHeaderFieldsTooLargeException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
index b4fd82f..8e00433 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * <p>Requests are throttled.
  */
 public class TooManyRequestsException extends ClientException {
-    public TooManyRequestsException(int responseCode, String message) {
-        super(responseCode, message);
+    public TooManyRequestsException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
index 361170e..35707c9 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * resource.
  */
 public class UnauthorizedException extends ClientException {
-    public UnauthorizedException(int responseCode, String message) {
-        super(responseCode, message);
+    public UnauthorizedException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
index 8fe578c..212c5eb 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
  * Generic exception for unsupported exception.
  */
 public class UnsupportedException extends ClientException {
-    public UnsupportedException(int responseCode, String message) {
-        super(responseCode, message);
+    public UnsupportedException(int responseCode, String requestId, String message) {
+        super(responseCode, requestId, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 093c02d..044b6aa 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -82,7 +82,6 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
 import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.rpc.Signature;
@@ -99,7 +98,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
 
     protected volatile ClientManager clientManager;
     protected final ClientConfiguration clientConfiguration;
-    protected final Endpoints accessEndpoints;
+    protected final Endpoints endpoints;
     protected final Set<String> topics;
     // Thread-safe set.
     protected final Set<Endpoints> isolated;
@@ -128,7 +127,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
 
     public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
         this.clientConfiguration = checkNotNull(clientConfiguration, "clientConfiguration should not be null");
-        this.accessEndpoints = new Endpoints(clientConfiguration.getEndpoints());
+        this.endpoints = new Endpoints(clientConfiguration.getEndpoints());
         this.topics = topics;
         // Generate client id firstly.
         this.clientId = Utilities.genClientId();
@@ -652,20 +651,20 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         try {
             Resource topicResource = Resource.newBuilder().setName(topic).build();
             final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
-                .setEndpoints(accessEndpoints.toProtobuf()).build();
+                .setEndpoints(endpoints.toProtobuf()).build();
             final Metadata metadata = sign();
             final ListenableFuture<InvocationContext<QueryRouteResponse>> contextFuture =
-                clientManager.queryRoute(accessEndpoints, metadata, request, clientConfiguration.getRequestTimeout());
-            return Futures.transform(contextFuture, context -> {
-                final QueryRouteResponse response = context.getResp();
+                clientManager.queryRoute(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
+            return Futures.transform(contextFuture, ctx -> {
+                final QueryRouteResponse response = ctx.getResp();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.OK != code) {
                     LOGGER.error("Exception raised while fetch topic route from remote, topic={}, " +
                             "clientId={}, endpoints={}, code={}, status message=[{}]", topic, clientId,
-                        accessEndpoints, code, status.getMessage());
+                        endpoints, code, status.getMessage());
                 }
-                return new TopicRouteDataResult(new TopicRouteData(response.getMessageQueuesList()), status);
+                return new TopicRouteDataResult(ctx);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
             return Futures.immediateFailedFuture(t);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index c8c783e..6dd8c9a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
@@ -36,7 +35,6 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@SuppressWarnings("NullableProblems")
 class FifoConsumeService extends ConsumeService {
     private static final Logger LOGGER = LoggerFactory.getLogger(FifoConsumeService.class);
 
@@ -69,18 +67,7 @@ class FifoConsumeService extends ConsumeService {
         final ListenableFuture<ConsumeResult> future0 = consume(next);
         final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next,
             result), MoreExecutors.directExecutor());
-        Futures.addCallback(future, new FutureCallback<Void>() {
-            @Override
-            public void onSuccess(Void ignore) {
-                consumeIteratively(pq, iterator);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                // Should never reach here.
-                LOGGER.error("[Bug] Exception raised in fifo message erasing task, clientId={}", clientId, t);
-            }
-        }, MoreExecutors.directExecutor());
+        future.addListener(() -> consumeIteratively(pq, iterator), MoreExecutors.directExecutor());
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index bdc6a6d..dd4cd17 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.message.MessageCommon;
@@ -472,6 +473,7 @@ class ProcessQueueImpl implements ProcessQueue {
         return future;
     }
 
+
     private ListenableFuture<Void> forwardToDeadLetterQueue(final MessageViewImpl messageView) {
         final SettableFuture<Void> future = SettableFuture.create();
         forwardToDeadLetterQueue(messageView, 1, future);
@@ -490,25 +492,27 @@ class ProcessQueueImpl implements ProcessQueue {
             @Override
             public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
                 final ForwardMessageToDeadLetterQueueResponse resp = context.getResp();
+                final String requestId = context.getRpcContext().getRequestId();
                 final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
-                            " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, code={}, "
-                            + "status message={}", clientId, consumerGroup, messageId, attempt, mq, endpoints, code,
-                        status.getMessage());
+                            " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+                            + "requestId={}, code={}, status message={}", clientId, consumerGroup, messageId, attempt,
+                        mq, endpoints, requestId, code, status.getMessage());
                     forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
                     return;
                 }
                 // Log retries.
                 if (1 < attempt) {
                     LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
-                            + "attempt={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
-                        messageId, mq, endpoints);
+                            + "attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
+                        attempt, messageId, mq, endpoints, requestId);
                 } else {
                     LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
-                        + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
+                            + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq,
+                        endpoints, requestId);
                 }
                 // Set result if message is forwarded successfully.
                 future0.setFuture(Futures.immediateVoidFuture());
@@ -570,6 +574,14 @@ class ProcessQueueImpl implements ProcessQueue {
                 final String requestId = context.getRpcContext().getRequestId();
                 final Status status = resp.getStatus();
                 final Code code = status.getCode();
+                if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
+                    LOGGER.error("Failed to ack fifo message due to the invalid receipt handle, forgive to retry, "
+                            + "clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+                            + "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
+                        endpoints, context.getRpcContext().getRequestId(), status.getMessage());
+                    future0.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
+                    return;
+                }
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, "
@@ -582,11 +594,11 @@ class ProcessQueueImpl implements ProcessQueue {
                 // Log retries.
                 if (1 < attempt) {
                     LOGGER.info("Re-ack fifo message successfully, clientId={}, consumerGroup={}, attempt={}, "
-                            + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
-                        messageId, mq, endpoints);
+                            + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, attempt,
+                        messageId, mq, endpoints, requestId);
                 } else {
                     LOGGER.debug("Ack fifo message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
-                        + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
+                        + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
                 }
                 // Set result if FIFO message is acknowledged successfully.
                 future0.setFuture(Futures.immediateVoidFuture());
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 7688b4b..f0cd53a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -124,7 +124,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
         this.clientConfiguration = clientConfiguration;
         Resource groupResource = new Resource(consumerGroup);
-        this.pushConsumerSettings = new PushConsumerSettings(clientId, accessEndpoints, groupResource,
+        this.pushConsumerSettings = new PushConsumerSettings(clientId, endpoints, groupResource,
             clientConfiguration.getRequestTimeout(), subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.subscriptionExpressions = subscriptionExpressions;
@@ -260,7 +260,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
     private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
         apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
         return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
-            .setEndpoints(accessEndpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
+            .setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
     }
 
     private ListenableFuture<Assignments> queryAssignment(final String topic) {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 58c2206..a4d5993 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -36,14 +36,12 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 
 public class ReceiveMessageResult {
     private final Endpoints endpoints;
-    private final String requestId;
     private final ClientException exception;
 
     private final List<MessageViewImpl> messages;
 
     public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages) {
         this.endpoints = endpoints;
-        this.requestId = requestId;
         final Code code = status.getCode();
         switch (code) {
             case OK:
@@ -56,31 +54,31 @@ public class ReceiveMessageResult {
             case ILLEGAL_FILTER_EXPRESSION:
             case ILLEGAL_INVISIBLE_TIME:
             case CLIENT_ID_REQUIRED:
-                this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+                this.exception = new BadRequestException(code.getNumber(), requestId, status.getMessage());
                 break;
             case UNAUTHORIZED:
-                this.exception = new UnauthorizedException(code.getNumber(), status.getMessage());
+                this.exception = new UnauthorizedException(code.getNumber(), requestId, status.getMessage());
                 break;
             case FORBIDDEN:
-                this.exception = new ForbiddenException(code.getNumber(), status.getMessage());
+                this.exception = new ForbiddenException(code.getNumber(), requestId, status.getMessage());
                 break;
             case NOT_FOUND:
             case TOPIC_NOT_FOUND:
             case CONSUMER_GROUP_NOT_FOUND:
-                this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+                this.exception = new NotFoundException(code.getNumber(), requestId, status.getMessage());
                 break;
             case TOO_MANY_REQUESTS:
-                this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+                this.exception = new TooManyRequestsException(code.getNumber(), requestId, status.getMessage());
                 break;
             case INTERNAL_ERROR:
             case INTERNAL_SERVER_ERROR:
-                this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+                this.exception = new InternalErrorException(code.getNumber(), requestId, status.getMessage());
                 break;
             case PROXY_TIMEOUT:
-                this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                this.exception = new ProxyTimeoutException(code.getNumber(), requestId, status.getMessage());
                 break;
             default:
-                this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+                this.exception = new UnsupportedException(code.getNumber(), requestId, status.getMessage());
         }
         this.messages = messages;
     }
@@ -112,8 +110,4 @@ public class ReceiveMessageResult {
     public List<MessageViewImpl> getMessages() {
         return messages;
     }
-
-    public String getRequestId() {
-        return requestId;
-    }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index f5029f1..f7a9998 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -80,7 +80,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         Map<String, FilterExpression> subscriptionExpressions) {
         super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
         Resource groupResource = new Resource(consumerGroup);
-        this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource,
+        this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, endpoints, groupResource,
             clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
         this.consumerGroup = consumerGroup;
         this.awaitDuration = awaitDuration;
@@ -242,8 +242,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
         final ListenableFuture<InvocationContext<AckMessageResponse>> future = ackMessage(impl);
-        return Futures.transformAsync(future, context -> {
-            final AckMessageResponse resp = context.getResp();
+        return Futures.transformAsync(future, ctx -> {
+            final String requestId = ctx.getRpcContext().getRequestId();
+            final AckMessageResponse resp = ctx.getResp();
             final Status status = resp.getStatus();
             final Code code = status.getCode();
             switch (code) {
@@ -254,23 +255,23 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
                 case ILLEGAL_CONSUMER_GROUP:
                 case INVALID_RECEIPT_HANDLE:
                 case CLIENT_ID_REQUIRED:
-                    throw new BadRequestException(code.getNumber(), status.getMessage());
+                    throw new BadRequestException(code.getNumber(), requestId, status.getMessage());
                 case UNAUTHORIZED:
-                    throw new UnauthorizedException(code.getNumber(), status.getMessage());
+                    throw new UnauthorizedException(code.getNumber(), requestId, status.getMessage());
                 case FORBIDDEN:
-                    throw new ForbiddenException(code.getNumber(), status.getMessage());
+                    throw new ForbiddenException(code.getNumber(), requestId, status.getMessage());
                 case NOT_FOUND:
                 case TOPIC_NOT_FOUND:
-                    throw new NotFoundException(code.getNumber(), status.getMessage());
+                    throw new NotFoundException(code.getNumber(), requestId, status.getMessage());
                 case TOO_MANY_REQUESTS:
-                    throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+                    throw new TooManyRequestsException(code.getNumber(), requestId, status.getMessage());
                 case INTERNAL_ERROR:
                 case INTERNAL_SERVER_ERROR:
-                    throw new InternalErrorException(code.getNumber(), status.getMessage());
+                    throw new InternalErrorException(code.getNumber(), requestId, status.getMessage());
                 case PROXY_TIMEOUT:
-                    throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                    throw new ProxyTimeoutException(code.getNumber(), requestId, status.getMessage());
                 default:
-                    throw new UnsupportedException(code.getNumber(), status.getMessage());
+                    throw new UnsupportedException(code.getNumber(), requestId, status.getMessage());
             }
         }, clientCallbackExecutor);
     }
@@ -304,12 +305,15 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         MessageViewImpl impl = (MessageViewImpl) messageView;
         final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
             changeInvisibleDuration(impl, invisibleDuration);
-        return Futures.transformAsync(future, context -> {
-            final ChangeInvisibleDurationResponse resp = context.getResp();
+        return Futures.transformAsync(future, ctx -> {
+            final ChangeInvisibleDurationResponse resp = ctx.getResp();
+            final String requestId = ctx.getRpcContext().getRequestId();
             // Refresh receipt handle manually.
             impl.setReceiptHandle(resp.getReceiptHandle());
             final Status status = resp.getStatus();
             final Code code = status.getCode();
+            final int codeNumber = code.getNumber();
+            final String statusMessage = status.getMessage();
             switch (code) {
                 case OK:
                     return Futures.immediateVoidFuture();
@@ -319,20 +323,20 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
                 case ILLEGAL_INVISIBLE_TIME:
                 case INVALID_RECEIPT_HANDLE:
                 case CLIENT_ID_REQUIRED:
-                    throw new BadRequestException(code.getNumber(), status.getMessage());
+                    throw new BadRequestException(codeNumber, requestId, statusMessage);
                 case UNAUTHORIZED:
-                    throw new UnauthorizedException(code.getNumber(), status.getMessage());
+                    throw new UnauthorizedException(codeNumber, requestId, statusMessage);
                 case NOT_FOUND:
                 case TOPIC_NOT_FOUND:
                 case TOO_MANY_REQUESTS:
-                    throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+                    throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
                 case INTERNAL_ERROR:
                 case INTERNAL_SERVER_ERROR:
-                    throw new InternalErrorException(code.getNumber(), status.getMessage());
+                    throw new InternalErrorException(codeNumber, requestId, statusMessage);
                 case PROXY_TIMEOUT:
-                    throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                    throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
                 default:
-                    throw new UnsupportedException(code.getNumber(), status.getMessage());
+                    throw new UnsupportedException(codeNumber, requestId, statusMessage);
             }
         }, MoreExecutors.directExecutor());
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index af192af..0764ed4 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -100,7 +100,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         TransactionChecker checker) {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
-        this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy,
+        this.producerSettings = new ProducerSettings(clientId, endpoints, retryPolicy,
             clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
         this.checker = checker;
 
@@ -451,9 +451,9 @@ class ProducerImpl extends ClientImpl implements Producer {
             clientManager.sendMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
 
         final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
-            response -> {
+            ctx -> {
                 final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
-                future0.set(SendReceiptImpl.processSendResponse(messageQueue, response.getResp()));
+                future0.set(SendReceiptImpl.processRespContext(messageQueue, ctx));
                 return future0;
             }, MoreExecutors.directExecutor());
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 400b2bc..f6f27f6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 
 public class SendReceiptImpl implements SendReceipt {
     private final MessageId messageId;
@@ -76,14 +77,18 @@ public class SendReceiptImpl implements SendReceipt {
         return offset;
     }
 
-    public static List<SendReceiptImpl> processSendResponse(MessageQueueImpl mq,
-        SendMessageResponse response) throws ClientException {
-        final Status status = response.getStatus();
+    public static List<SendReceiptImpl> processRespContext(MessageQueueImpl mq,
+        InvocationContext<SendMessageResponse> ctx) throws ClientException {
+        final String requestId = ctx.getRpcContext().getRequestId();
+        final SendMessageResponse resp = ctx.getResp();
+        final Status status = resp.getStatus();
         List<SendReceiptImpl> sendReceipts = new ArrayList<>();
-        final List<SendResultEntry> entries = response.getEntriesList();
+        final List<SendResultEntry> entries = resp.getEntriesList();
         for (SendResultEntry entry : entries) {
             final Status entryStatus = entry.getStatus();
             final Code code = entryStatus.getCode();
+            final int codeNumber = code.getNumber();
+            final String statusMessage = status.getMessage();
             switch (code) {
                 case OK:
                     final MessageId messageId = MessageIdCodec.getInstance().decode(entry.getMessageId());
@@ -102,33 +107,33 @@ public class SendReceiptImpl implements SendReceipt {
                 case MESSAGE_PROPERTY_CONFLICT_WITH_TYPE:
                 case MESSAGE_CORRUPTED:
                 case CLIENT_ID_REQUIRED:
-                    throw new BadRequestException(code.getNumber(), status.getMessage());
+                    throw new BadRequestException(codeNumber, requestId, statusMessage);
                 case UNAUTHORIZED:
-                    throw new UnauthorizedException(code.getNumber(), status.getMessage());
+                    throw new UnauthorizedException(codeNumber, requestId, statusMessage);
                 case FORBIDDEN:
-                    throw new ForbiddenException(code.getNumber(), status.getMessage());
+                    throw new ForbiddenException(codeNumber, requestId, statusMessage);
                 case NOT_FOUND:
                 case TOPIC_NOT_FOUND:
-                    throw new NotFoundException(code.getNumber(), status.getMessage());
+                    throw new NotFoundException(codeNumber, requestId, statusMessage);
                 case PAYLOAD_TOO_LARGE:
                 case MESSAGE_BODY_TOO_LARGE:
-                    throw new PayloadTooLargeException(code.getNumber(), status.getMessage());
+                    throw new PayloadTooLargeException(codeNumber, requestId, statusMessage);
                 case TOO_MANY_REQUESTS:
-                    throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+                    throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
                 case REQUEST_HEADER_FIELDS_TOO_LARGE:
                 case MESSAGE_PROPERTIES_TOO_LARGE:
-                    throw new RequestHeaderFieldsTooLargeException(code.getNumber(), status.getMessage());
+                    throw new RequestHeaderFieldsTooLargeException(codeNumber, requestId, statusMessage);
                 case INTERNAL_ERROR:
                 case INTERNAL_SERVER_ERROR:
                 case HA_NOT_AVAILABLE:
-                    throw new InternalErrorException(code.getNumber(), status.getMessage());
+                    throw new InternalErrorException(codeNumber, requestId, statusMessage);
                 case PROXY_TIMEOUT:
                 case MASTER_PERSISTENCE_TIMEOUT:
                 case SLAVE_PERSISTENCE_TIMEOUT:
-                    throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                    throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
                 case UNSUPPORTED:
                 default:
-                    throw new UnsupportedException(code.getNumber(), status.getMessage());
+                    throw new UnsupportedException(codeNumber, requestId, statusMessage);
             }
         }
         return sendReceipts;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index 29ce773..8c9b75f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -17,12 +17,13 @@
 
 package org.apache.rocketmq.client.java.route;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-
 import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.MessageQueue;
+import apache.rocketmq.v2.QueryRouteResponse;
 import apache.rocketmq.v2.Status;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
+import java.util.List;
 import javax.annotation.concurrent.Immutable;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.exception.BadRequestException;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
 import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.exception.UnsupportedException;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 
 /**
  * Result topic route data fetched from remote.
@@ -40,9 +42,16 @@ public class TopicRouteDataResult {
     private final TopicRouteData topicRouteData;
     private final ClientException exception;
 
-    public TopicRouteDataResult(TopicRouteData topicRouteData, Status status) {
-        this.topicRouteData = checkNotNull(topicRouteData, "topicRouteData should not be null");
+    public TopicRouteDataResult(InvocationContext<QueryRouteResponse> ctx) {
+        final QueryRouteResponse resp = ctx.getResp();
+        final String requestId = ctx.getRpcContext().getRequestId();
+        final List<MessageQueue> messageQueuesList = resp.getMessageQueuesList();
+        final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList);
+        final Status status = resp.getStatus();
+        this.topicRouteData = topicRouteData;
         final Code code = status.getCode();
+        final int codeNumber = code.getNumber();
+        final String statusMessage = status.getMessage();
         switch (code) {
             case OK:
                 this.exception = null;
@@ -51,24 +60,24 @@ public class TopicRouteDataResult {
             case ILLEGAL_ACCESS_POINT:
             case ILLEGAL_TOPIC:
             case CLIENT_ID_REQUIRED:
-                this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+                this.exception = new BadRequestException(codeNumber, requestId, statusMessage);
                 break;
             case NOT_FOUND:
             case TOPIC_NOT_FOUND:
-                this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+                this.exception = new NotFoundException(codeNumber, requestId, statusMessage);
                 break;
             case TOO_MANY_REQUESTS:
-                this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+                this.exception = new TooManyRequestsException(codeNumber, requestId, statusMessage);
                 break;
             case INTERNAL_ERROR:
             case INTERNAL_SERVER_ERROR:
-                this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+                this.exception = new InternalErrorException(codeNumber, requestId, statusMessage);
                 break;
             case PROXY_TIMEOUT:
-                this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                this.exception = new ProxyTimeoutException(codeNumber, requestId, statusMessage);
                 break;
             default:
-                this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+                this.exception = new UnsupportedException(codeNumber, requestId, statusMessage);
         }
     }
 
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index f3bcbb2..3aa6d80 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -361,8 +361,7 @@ public class TestBase {
         MessageQueueImpl mq) throws ExecutionException, InterruptedException, ClientException {
         final ListenableFuture<InvocationContext<SendMessageResponse>> future =
             okSendMessageResponseFutureWithSingleEntry();
-        final SendMessageResponse response = future.get().getResp();
-        final List<SendReceiptImpl> receipts = SendReceiptImpl.processSendResponse(mq, response);
+        final List<SendReceiptImpl> receipts = SendReceiptImpl.processRespContext(mq, future.get());
         return receipts.iterator().next();
     }
 }