You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/06 12:08:32 UTC

[rocketmq-clients] branch java updated (fd7eb2c -> ceceb12)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a change to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


    from fd7eb2c  Java: fix compile issue on JDK17
     add 15709fe  Docs: polish the java part (#23)
     add 339044b  Docs: fix typo
     add bd3f083  Make the CPP codebase compiles on popular platforms Mac/Windows/Linux (#24)
     add 8042fef  Java: bugfix about forgetting to start consumer service during client startup
     add 9b8e5b5  Java: fix opposite judgement for delay consumption (#25)
     add 7572bee  Golang: Initial commit (#19)
     add 4232830  Java: adapt for the latest protocol (#27)
     add 75279e1  Golang: add github action (#28)
     add a471703  Polish docs
     new ceceb12  Java: Add invocation context

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .asf.yaml                                          |    9 +-
 .github/workflows/cpp_build.yml                    |   11 +-
 .github/workflows/csharp_build.yml                 |    4 +-
 .github/workflows/golang_build.yml                 |   20 +
 .github/workflows/java_build.yml                   |    2 +-
 README.md                                          |    1 +
 cpp/.bazelrc                                       |    4 +-
 cpp/.bazelversion                                  |    2 +-
 cpp/api/rocketmq/ErrorCode.h                       |  107 +-
 cpp/api/rocketmq/Tracing.h                         |   10 +-
 cpp/bazel/rocketmq_deps.bzl                        |   55 +-
 cpp/proto/apache/rocketmq/v2/definition.proto      |  178 +-
 cpp/proto/apache/rocketmq/v2/service.proto         |    7 +
 cpp/src/main/cpp/base/ErrorCategory.cpp            |   29 +-
 cpp/src/main/cpp/base/tests/RetryPolicyTest.cpp    |   14 +-
 cpp/src/main/cpp/client/ClientManagerImpl.cpp      |  515 ++-
 .../main/cpp/client/ReceiveMessageStreamReader.cpp |   54 +-
 cpp/src/main/cpp/client/RpcClientImpl.cpp          |    4 +-
 cpp/src/main/cpp/client/TelemetryBidiReactor.cpp   |    4 +-
 cpp/src/main/cpp/client/include/ClientConfig.h     |    7 +-
 cpp/src/main/cpp/client/include/RpcClient.h        |    7 +-
 cpp/src/main/cpp/client/include/RpcClientImpl.h    |    7 +-
 cpp/src/main/cpp/rocketmq/ProcessQueueImpl.cpp     |    4 +-
 cpp/src/main/cpp/rocketmq/ProducerImpl.cpp         |   13 +-
 cpp/src/main/cpp/rocketmq/PushConsumerImpl.cpp     |    2 +-
 cpp/src/main/cpp/rocketmq/include/ClientImpl.h     |    6 +
 cpp/src/main/cpp/stats/PublishStats.cpp            |    2 +-
 docs/artwork/unified_apis_and_spec.png             |  Bin 119393 -> 627146 bytes
 golang/README.md                                   |   31 +
 golang/broker.go                                   |  364 +++
 golang/broker_options.go                           |  112 +
 .../cpp/base/ErrorCode.cpp => golang/config.go     |   16 +-
 golang/conn.go                                     |  201 ++
 golang/conn_options.go                             |  189 ++
 golang/consumer.go                                 |  153 +
 .../BrokerData.cpp => golang/consumer_options.go   |   62 +-
 .../credentials/credentials.go                     |   13 +-
 golang/example/consumer/main.go                    |   60 +
 golang/example/producer/main.go                    |   69 +
 golang/go.mod                                      |   28 +
 golang/go.sum                                      |  207 ++
 .../main/cpp/base/Tracing.cpp => golang/message.go |   22 +-
 .../AdminServer.h => golang/metadata/metadata.go   |   49 +-
 golang/ns.go                                       |  179 ++
 golang/ns_options.go                               |  105 +
 golang/pkg/grpc/middleware/zaplog/zaplog.go        |   56 +
 .../TransactionImpl.cpp => golang/pkg/net/net.go   |   56 +-
 .../LinkedElement.java => golang/pkg/os/host.go    |   18 +-
 .../State.h => golang/pkg/ticker/ticker.go         |   34 +-
 golang/pkg/zaplog/ctxzap/ctxzap.go                 |   64 +
 golang/pkg/zaplog/logger.go                        |   58 +
 golang/pkg/zaplog/options.go                       |   88 +
 .../MetadataConstants.h => golang/producer.go      |   73 +-
 golang/producer_options.go                         |   55 +
 golang/protocol/v1/admin.pb.go                     |  246 ++
 golang/protocol/v1/definition.pb.go                | 1407 ++++++++
 golang/protocol/v1/service.pb.go                   | 3342 ++++++++++++++++++++
 java/README.md                                     |   85 +-
 .../rocketmq/client/apis/ClientException.java      |   24 +-
 .../rocketmq/client/java/impl/ClientImpl.java      |   22 +-
 .../rocketmq/client/java/impl/ClientManager.java   |   48 +-
 .../client/java/impl/ClientManagerImpl.java        |   71 +-
 .../client/java/impl/consumer/ConsumeService.java  |    9 +-
 .../client/java/impl/consumer/ConsumerImpl.java    |   41 +-
 .../java/impl/consumer/FifoConsumeService.java     |   14 +
 .../java/impl/consumer/ProcessQueueImpl.java       |   46 +-
 .../java/impl/consumer/PushConsumerImpl.java       |   31 +-
 .../java/impl/consumer/PushConsumerSettings.java   |    4 +-
 .../java/impl/consumer/ReceiveMessageResult.java   |    8 +-
 .../java/impl/consumer/SimpleConsumerImpl.java     |   19 +-
 .../java/impl/consumer/SimpleConsumerSettings.java |    4 +-
 .../java/impl/consumer/StandardConsumeService.java |   14 +
 .../client/java/impl/producer/ProducerImpl.java    |   21 +-
 .../rocketmq/client/java/message/MessageImpl.java  |    2 +-
 .../client/java/message/MessageViewImpl.java       |   16 +-
 .../client/java/message/PublishingMessageImpl.java |    5 +
 .../rocketmq/client/java/metrics/MessageMeter.java |   13 +-
 .../rocketmq/client/java/misc/LinkedElement.java   |   11 +
 .../rocketmq/client/java/misc/LinkedIterator.java  |   11 +
 .../InvocationContext.java}                        |   36 +-
 .../apache/rocketmq/client/java/rpc/RpcClient.java |   51 +-
 .../rocketmq/client/java/rpc/RpcClientImpl.java    |  100 +-
 .../java/{impl/Client.java => rpc/RpcContext.java} |   35 +-
 .../apache/rocketmq/client/java/rpc/Signature.java |    2 +-
 .../apache/rocketmq/client/java/rpc/TLSHelper.java |    7 +-
 .../java/impl/consumer/ProcessQueueImplTest.java   |   15 +-
 .../java/impl/consumer/PushConsumerImplTest.java   |    6 +-
 .../java/impl/consumer/SimpleConsumerImplTest.java |   13 +-
 .../java/impl/producer/ProducerImplTest.java       |   21 +-
 .../apache/rocketmq/client/java/tool/TestBase.java |   97 +-
 java/{README.md => example.md}                     |   50 +-
 91 files changed, 8520 insertions(+), 867 deletions(-)
 create mode 100644 .github/workflows/golang_build.yml
 create mode 100644 golang/README.md
 create mode 100644 golang/broker.go
 create mode 100644 golang/broker_options.go
 copy cpp/src/main/cpp/base/ErrorCode.cpp => golang/config.go (65%)
 create mode 100644 golang/conn.go
 create mode 100644 golang/conn_options.go
 create mode 100644 golang/consumer.go
 copy cpp/src/main/cpp/remoting/BrokerData.cpp => golang/consumer_options.go (52%)
 copy cpp/src/main/cpp/base/ErrorCode.cpp => golang/credentials/credentials.go (74%)
 create mode 100644 golang/example/consumer/main.go
 create mode 100644 golang/example/producer/main.go
 create mode 100644 golang/go.mod
 create mode 100644 golang/go.sum
 rename cpp/src/main/cpp/base/Tracing.cpp => golang/message.go (74%)
 copy cpp/api/rocketmq/AdminServer.h => golang/metadata/metadata.go (61%)
 create mode 100644 golang/ns.go
 create mode 100644 golang/ns_options.go
 create mode 100644 golang/pkg/grpc/middleware/zaplog/zaplog.go
 copy cpp/src/main/cpp/rocketmq/TransactionImpl.cpp => golang/pkg/net/net.go (53%)
 copy java/client/src/main/java/org/apache/rocketmq/client/java/misc/LinkedElement.java => golang/pkg/os/host.go (83%)
 copy cpp/api/rocketmq/State.h => golang/pkg/ticker/ticker.go (70%)
 create mode 100644 golang/pkg/zaplog/ctxzap/ctxzap.go
 create mode 100644 golang/pkg/zaplog/logger.go
 create mode 100644 golang/pkg/zaplog/options.go
 copy cpp/src/main/cpp/base/include/MetadataConstants.h => golang/producer.go (52%)
 create mode 100644 golang/producer_options.go
 create mode 100644 golang/protocol/v1/admin.pb.go
 create mode 100644 golang/protocol/v1/definition.pb.go
 create mode 100644 golang/protocol/v1/service.pb.go
 copy java/client/src/main/java/org/apache/rocketmq/client/java/{misc/LinkedIterator.java => rpc/InvocationContext.java} (59%)
 copy java/client/src/main/java/org/apache/rocketmq/client/java/{impl/Client.java => rpc/RpcContext.java} (62%)
 copy java/{README.md => example.md} (58%)


[rocketmq-clients] 01/01: Java: Add invocation context

Posted by aa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit ceceb124cadee9080356bf06ac02c52b4ba69596
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Jul 6 20:08:02 2022 +0800

    Java: Add invocation context
---
 .../rocketmq/client/java/impl/ClientImpl.java      |  19 ++--
 .../rocketmq/client/java/impl/ClientManager.java   |  48 +++++-----
 .../client/java/impl/ClientManagerImpl.java        |  71 ++++++---------
 .../client/java/impl/consumer/ConsumerImpl.java    |  41 +++++----
 .../java/impl/consumer/ProcessQueueImpl.java       |  33 ++++---
 .../java/impl/consumer/PushConsumerImpl.java       |  27 +++---
 .../java/impl/consumer/ReceiveMessageResult.java   |   8 +-
 .../java/impl/consumer/SimpleConsumerImpl.java     |  19 ++--
 .../client/java/impl/producer/ProducerImpl.java    |  21 +++--
 .../client/java/rpc/InvocationContext.java         |  46 ++++++++++
 .../apache/rocketmq/client/java/rpc/RpcClient.java |  51 ++++++-----
 .../rocketmq/client/java/rpc/RpcClientImpl.java    | 100 ++++++++++++++-------
 .../rocketmq/client/java/rpc/RpcContext.java       |  39 ++++++++
 .../apache/rocketmq/client/java/rpc/Signature.java |   2 +-
 .../apache/rocketmq/client/java/rpc/TLSHelper.java |   7 +-
 .../java/impl/consumer/ProcessQueueImplTest.java   |  15 ++--
 .../java/impl/consumer/PushConsumerImplTest.java   |   6 +-
 .../java/impl/consumer/SimpleConsumerImplTest.java |  13 +--
 .../java/impl/producer/ProducerImplTest.java       |  21 +++--
 .../apache/rocketmq/client/java/tool/TestBase.java |  97 +++++++++-----------
 20 files changed, 409 insertions(+), 275 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index a4b144d..590bbee 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -81,6 +81,7 @@ import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.rpc.Signature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -555,7 +556,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     /**
      * Real-time signature generation
      */
-    protected Metadata sign() throws UnsupportedEncodingException, NoSuchAlgorithmException, InvalidKeyException {
+    protected Metadata sign() throws NoSuchAlgorithmException, InvalidKeyException {
         return Signature.sign(clientConfiguration, clientId);
     }
 
@@ -568,11 +569,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     private void doHeartbeat(HeartbeatRequest request, final Endpoints endpoints) {
         try {
             Metadata metadata = sign();
-            final ListenableFuture<HeartbeatResponse> future = clientManager
+            final ListenableFuture<InvocationContext<HeartbeatResponse>> future = clientManager
                 .heartbeat(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
-            Futures.addCallback(future, new FutureCallback<HeartbeatResponse>() {
+            Futures.addCallback(future, new FutureCallback<InvocationContext<HeartbeatResponse>>() {
                 @Override
-                public void onSuccess(HeartbeatResponse response) {
+                public void onSuccess(InvocationContext<HeartbeatResponse> context) {
+                    final HeartbeatResponse response = context.getResp();
                     final Status status = response.getStatus();
                     final Code code = status.getCode();
                     if (Code.OK != code) {
@@ -612,15 +614,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     }
 
     private ListenableFuture<TopicRouteDataResult> fetchTopicRoute(final String topic) {
-        final SettableFuture<TopicRouteDataResult> future = SettableFuture.create();
         try {
             Resource topicResource = Resource.newBuilder().setName(topic).build();
             final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
                 .setEndpoints(accessEndpoints.toProtobuf()).build();
             final Metadata metadata = sign();
-            final ListenableFuture<QueryRouteResponse> responseFuture =
+            final ListenableFuture<InvocationContext<QueryRouteResponse>> contextFuture =
                 clientManager.queryRoute(accessEndpoints, metadata, request, clientConfiguration.getRequestTimeout());
-            return Futures.transform(responseFuture, response -> {
+            return Futures.transform(contextFuture, context -> {
+                final QueryRouteResponse response = context.getResp();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.OK != code) {
@@ -631,8 +633,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 return new TopicRouteDataResult(new TopicRouteData(response.getMessageQueuesList()), status);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index 9f40a6d..9901ac9 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -46,6 +46,7 @@ import java.util.Iterator;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 
 /**
  * Client manager supplies a series of unified APIs to execute remote procedure calls for each {@link Client}.
@@ -90,9 +91,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   query route request.
      * @param duration  request max duration.
-     * @return response future of the topic route.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryRouteResponse> queryRoute(Endpoints endpoints, Metadata metadata, QueryRouteRequest request,
+    ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata,
+        QueryRouteRequest request,
         Duration duration);
 
     /**
@@ -102,9 +104,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   heartbeat request.
      * @param duration  request max duration.
-     * @return response future of heartbeat.
+     * @return invocation of response future.
      */
-    ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints, Metadata metadata, HeartbeatRequest request,
+    ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata,
+        HeartbeatRequest request,
         Duration duration);
 
     /**
@@ -114,9 +117,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   send message request.
      * @param duration  request max duration.
-     * @return response future of the sending message.
+     * @return invocation of response future.
      */
-    ListenableFuture<SendMessageResponse> sendMessage(Endpoints endpoints, Metadata metadata,
+    ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata,
         SendMessageRequest request, Duration duration);
 
     /**
@@ -126,9 +129,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   query assignment request.
      * @param duration  request max duration.
-     * @return response future of query assignment.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints endpoints, Metadata metadata,
+    ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints, Metadata metadata,
         QueryAssignmentRequest request, Duration duration);
 
     /**
@@ -136,9 +139,10 @@ public interface ClientManager {
      *
      * @param endpoints requested endpoints.
      * @param metadata  gRPC request header metadata.
+     * @return invocation of response future.
      */
-    ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Endpoints endpoints, Metadata metadata,
-        ReceiveMessageRequest request, Duration duration);
+    ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints,
+        Metadata metadata, ReceiveMessageRequest request, Duration duration);
 
     /**
      * Ack message asynchronously after the success of consumption, the method ensures no throwable.
@@ -147,10 +151,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   ack message request.
      * @param duration  request max duration.
-     * @return response future of ack message.
+     * @return invocation of response future.
      */
-    ListenableFuture<AckMessageResponse> ackMessage(Endpoints endpoints, Metadata metadata, AckMessageRequest request,
-        Duration duration);
+    ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata,
+        AckMessageRequest request, Duration duration);
 
     /**
      * Nack message asynchronously after the failure of consumption, the method ensures no throwable.
@@ -159,10 +163,10 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   nack message request.
      * @param duration  request max duration.
-     * @return response future of nack message.
+     * @return invocation of response future.
      */
-    ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Endpoints endpoints, Metadata metadata,
-        ChangeInvisibleDurationRequest request, Duration duration);
+    ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Endpoints endpoints,
+        Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration);
 
     /**
      * Send a message to the dead letter queue asynchronously, the method ensures no throwable.
@@ -171,9 +175,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   request of sending a message to DLQ.
      * @param duration  request max duration.
-     * @return response future of sending a message to DLQ.
+     * @return invocation of response future.
      */
-    ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(
+    ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Duration duration);
 
     /**
@@ -183,9 +187,9 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @param request   end transaction request.
      * @param duration  request max duration.
-     * @return response future of submitting transaction resolution.
+     * @return invocation of response future.
      */
-    ListenableFuture<EndTransactionResponse> endTransaction(Endpoints endpoints, Metadata metadata,
+    ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints, Metadata metadata,
         EndTransactionRequest request, Duration duration);
 
     /**
@@ -198,8 +202,8 @@ public interface ClientManager {
      * @return response future of notification of client termination.
      */
     @SuppressWarnings("UnusedReturnValue")
-    ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(Endpoints endpoints, Metadata metadata,
-        NotifyClientTerminationRequest request, Duration duration);
+    ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(Endpoints endpoints,
+        Metadata metadata, NotifyClientTerminationRequest request, Duration duration);
 
     StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata metadata,
         Duration duration, StreamObserver<TelemetryCommand> responseObserver) throws ClientException;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 4d33ab0..337edcc 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -39,8 +39,8 @@ import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.AbstractIdleService;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
@@ -65,6 +65,7 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.rpc.RpcClient;
 import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
 import org.slf4j.Logger;
@@ -235,132 +236,112 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<QueryRouteResponse> queryRoute(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata,
         QueryRouteRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.queryRoute(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<QueryRouteResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<HeartbeatResponse> heartbeat(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata,
         HeartbeatRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.heartbeat(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<HeartbeatResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<SendMessageResponse> sendMessage(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata,
         SendMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.sendMessage(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<SendMessageResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<QueryAssignmentResponse> queryAssignment(Endpoints endpoints, Metadata metadata,
-        QueryAssignmentRequest request, Duration duration) {
+    public ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints,
+        Metadata metadata, QueryAssignmentRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.queryAssignment(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Endpoints endpoints, Metadata metadata,
-        ReceiveMessageRequest request, Duration duration) {
+    public ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints,
+        Metadata metadata, ReceiveMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.receiveMessage(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            SettableFuture<Iterator<ReceiveMessageResponse>> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<AckMessageResponse> ackMessage(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata,
         AckMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.ackMessage(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<AckMessageResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Endpoints endpoints,
-        Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration) {
+    public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
+        Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.changeInvisibleDuration(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<ChangeInvisibleDurationResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(
+    public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.forwardMessageToDeadLetterQueue(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<ForwardMessageToDeadLetterQueueResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<EndTransactionResponse> endTransaction(Endpoints endpoints, Metadata metadata,
-        EndTransactionRequest request, Duration duration) {
+    public ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints,
+        Metadata metadata, EndTransactionRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.endTransaction(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            SettableFuture<EndTransactionResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
     @Override
-    public ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(
+    public ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(
         Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
             return rpcClient.notifyClientTermination(metadata, request, asyncWorker, duration);
         } catch (Throwable t) {
-            final SettableFuture<NotifyClientTerminationResponse> future = SettableFuture.create();
-            future.setException(t);
-            return future;
+            return Futures.immediateFailedFuture(t);
         }
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 539b876..a8b04ec 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,16 +72,18 @@ abstract class ConsumerImpl extends ClientImpl {
     }
 
     @SuppressWarnings("SameParameterValue")
-    protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request, MessageQueueImpl mq,
-        Duration timeout) {
+    protected ListenableFuture<ReceiveMessageResult> receiveMessage(ReceiveMessageRequest request,
+        MessageQueueImpl mq, Duration timeout) {
         List<MessageViewImpl> messages = new ArrayList<>();
         final SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
         try {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
-            final ListenableFuture<Iterator<ReceiveMessageResponse>> future = clientManager.receiveMessage(endpoints,
-                metadata, request, timeout);
-            return Futures.transform(future, it -> {
+            final ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
+                clientManager.receiveMessage(endpoints,
+                    metadata, request, timeout);
+            return Futures.transform(future, context -> {
+                final Iterator<ReceiveMessageResponse> it = context.getResp();
                 // Null here means status not set yet.
                 Status status = null;
                 Timestamp deliveryTimestampFromRemote = null;
@@ -106,7 +109,7 @@ abstract class ConsumerImpl extends ClientImpl {
                     final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
                     messages.add(view);
                 }
-                return new ReceiveMessageResult(endpoints, status, messages);
+                return new ReceiveMessageResult(endpoints, context.getRpcContext().getRequestId(), status, messages);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
             future0.setException(t);
@@ -134,9 +137,9 @@ abstract class ConsumerImpl extends ClientImpl {
 
     }
 
-    public ListenableFuture<AckMessageResponse> ackMessage(MessageViewImpl messageView) {
+    public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(MessageViewImpl messageView) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<AckMessageResponse> future;
+        ListenableFuture<InvocationContext<AckMessageResponse>> future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = Collections.singletonList(messageView.getMessageCommon());
@@ -146,11 +149,14 @@ abstract class ConsumerImpl extends ClientImpl {
             final Metadata metadata = sign();
             future = clientManager.ackMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            return Futures.immediateFailedFuture(t);
+            final SettableFuture<InvocationContext<AckMessageResponse>> future0 = SettableFuture.create();
+            future0.setException(t);
+            future = future0;
         }
-        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+        Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() {
             @Override
-            public void onSuccess(AckMessageResponse response) {
+            public void onSuccess(InvocationContext<AckMessageResponse> context) {
+                final AckMessageResponse response = context.getResp();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
@@ -168,10 +174,10 @@ abstract class ConsumerImpl extends ClientImpl {
         return future;
     }
 
-    public ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(MessageViewImpl messageView,
-        Duration invisibleDuration) {
+    public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
+        MessageViewImpl messageView, Duration invisibleDuration) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<ChangeInvisibleDurationResponse> future;
+        ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = Collections.singletonList(messageView.getMessageCommon());
@@ -182,14 +188,15 @@ abstract class ConsumerImpl extends ClientImpl {
             future = clientManager.changeInvisibleDuration(endpoints, metadata, request,
                 clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<ChangeInvisibleDurationResponse> future0 = SettableFuture.create();
+            final SettableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future0 = SettableFuture.create();
             future0.setException(t);
             future = future0;
         }
         final MessageId messageId = messageView.getMessageId();
-        Futures.addCallback(future, new FutureCallback<ChangeInvisibleDurationResponse>() {
+        Futures.addCallback(future, new FutureCallback<InvocationContext<ChangeInvisibleDurationResponse>>() {
             @Override
-            public void onSuccess(ChangeInvisibleDurationResponse response) {
+            public void onSuccess(InvocationContext<ChangeInvisibleDurationResponse> context) {
+                final ChangeInvisibleDurationResponse response = context.getResp();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index f36012f..c914dce 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -51,6 +51,7 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -387,11 +388,12 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView);
-        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = consumer.ackMessage(messageView);
+        Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() {
             @Override
-            public void onSuccess(AckMessageResponse response) {
-                final Status status = response.getStatus();
+            public void onSuccess(InvocationContext<AckMessageResponse> context) {
+                final AckMessageResponse resp = context.getResp();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 if (Code.OK.equals(code)) {
                     LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
@@ -478,16 +480,17 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void forwardToDeadLetterQueue(final MessageViewImpl messageView, final int attempt,
         final SettableFuture<Void> future0) {
-        final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future =
+        final ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future =
             consumer.forwardMessageToDeadLetterQueue(messageView);
         final String clientId = consumer.getClientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
+        Futures.addCallback(future, new FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
             @Override
-            public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
-                final Status status = response.getStatus();
+            public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
+                final ForwardMessageToDeadLetterQueueResponse resp = context.getResp();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
@@ -559,17 +562,19 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<AckMessageResponse> future = consumer.ackMessage(messageView);
-        Futures.addCallback(future, new FutureCallback<AckMessageResponse>() {
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = consumer.ackMessage(messageView);
+        Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() {
             @Override
-            public void onSuccess(AckMessageResponse response) {
-                final Status status = response.getStatus();
+            public void onSuccess(InvocationContext<AckMessageResponse> context) {
+                final AckMessageResponse resp = context.getResp();
+                final String requestId = context.getRpcContext().getRequestId();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, "
-                            + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, endpoints={}, status "
-                            + "message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code,
+                            + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, requestId={}, endpoints={}, "
+                            + "status message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code, requestId,
                         endpoints, status.getMessage());
                     ackFifoMessageLater(messageView, 1 + attempt, future0);
                     return;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 4a80377..41648bd 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -71,6 +71,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -264,15 +265,16 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
 
     private ListenableFuture<Assignments> queryAssignment(final String topic) {
         final ListenableFuture<Endpoints> future = pickEndpointsToQueryAssignments(topic);
-        final ListenableFuture<QueryAssignmentResponse> responseFuture =
+        final ListenableFuture<InvocationContext<QueryAssignmentResponse>> responseFuture =
             Futures.transformAsync(future, endpoints -> {
                 final Metadata metadata = sign();
                 final QueryAssignmentRequest request = wrapQueryAssignmentRequest(topic);
                 return clientManager.queryAssignment(endpoints, metadata, request,
                     clientConfiguration.getRequestTimeout());
             }, MoreExecutors.directExecutor());
-        return Futures.transformAsync(responseFuture, response -> {
-            final Status status = response.getStatus();
+        return Futures.transformAsync(responseFuture, context -> {
+            final QueryAssignmentResponse resp = context.getResp();
+            final Status status = resp.getStatus();
             final Code code = status.getCode();
             if (!Code.OK.equals(code)) {
                 final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]",
@@ -280,7 +282,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                 throw new RuntimeException(message);
             }
             SettableFuture<Assignments> future0 = SettableFuture.create();
-            final List<Assignment> assignmentList = response.getAssignmentsList().stream().map(assignment ->
+            final List<Assignment> assignmentList = resp.getAssignmentsList().stream().map(assignment ->
                 new Assignment(new MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList());
             final Assignments assignments = new Assignments(assignmentList);
             future0.set(assignments);
@@ -398,7 +400,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                             cacheAssignments.put(topic, latest);
                             return;
                         }
-                        LOGGER.info("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
+                        LOGGER.debug("Assignments of topic={} remains the same, assignments={}, clientId={}", topic,
                             existed, clientId);
                         // Process queue may be dropped, need to be synchronized anyway.
                         syncProcessQueue(topic, latest, filterExpression);
@@ -509,7 +511,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
     }
 
-    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(
+    public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         final MessageViewImpl messageView) {
         // Intercept before forwarding message to DLQ.
         final Stopwatch stopwatch = Stopwatch.createStarted();
@@ -517,7 +519,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         doBefore(MessageHookPoints.FORWARD_TO_DLQ, messageCommons);
 
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future;
+        ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future;
         try {
             final ForwardMessageToDeadLetterQueueRequest request =
                 wrapForwardMessageToDeadLetterQueueRequest(messageView);
@@ -525,15 +527,14 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             future = clientManager.forwardMessageToDeadLetterQueue(endpoints, metadata, request,
                 clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            future = Futures.immediateFailedFuture(t);
         }
-        Futures.addCallback(future, new FutureCallback<ForwardMessageToDeadLetterQueueResponse>() {
+        Futures.addCallback(future, new FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
             @Override
-            public void onSuccess(ForwardMessageToDeadLetterQueueResponse response) {
+            public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
+                final ForwardMessageToDeadLetterQueueResponse resp = context.getResp();
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ?
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(resp.getStatus().getCode()) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 // Intercept after forwarding message to DLQ.
                 doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons, duration, messageHookPointsStatus);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 09b8d12..79d15a8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -36,12 +36,14 @@ import org.apache.rocketmq.client.java.route.Endpoints;
 
 public class ReceiveMessageResult {
     private final Endpoints endpoints;
+    private final String requestId;
     private final ClientException exception;
 
     private final List<MessageViewImpl> messages;
 
-    public ReceiveMessageResult(Endpoints endpoints, Status status, List<MessageViewImpl> messages) {
+    public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages) {
         this.endpoints = endpoints;
+        this.requestId = requestId;
         final Code code = status.getCode();
         switch (code) {
             case OK:
@@ -108,4 +110,8 @@ public class ReceiveMessageResult {
     public List<MessageViewImpl> getMessages() {
         return messages;
     }
+
+    public String getRequestId() {
+        return requestId;
+    }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index b0ccfd9..77750e1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -56,6 +56,7 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -240,9 +241,10 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
             return future0;
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<AckMessageResponse> future = ackMessage(impl);
-        return Futures.transformAsync(future, response -> {
-            final Status status = response.getStatus();
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = ackMessage(impl);
+        return Futures.transformAsync(future, context -> {
+            final AckMessageResponse resp = context.getResp();
+            final Status status = resp.getStatus();
             final Code code = status.getCode();
             switch (code) {
                 case OK:
@@ -300,12 +302,13 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
             return future0;
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<ChangeInvisibleDurationResponse> future = changeInvisibleDuration(impl,
-            invisibleDuration);
-        return Futures.transformAsync(future, response -> {
+        final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
+            changeInvisibleDuration(impl, invisibleDuration);
+        return Futures.transformAsync(future, context -> {
+            final ChangeInvisibleDurationResponse resp = context.getResp();
             // Refresh receipt handle manually.
-            impl.setReceiptHandle(response.getReceiptHandle());
-            final Status status = response.getStatus();
+            impl.setReceiptHandle(resp.getReceiptHandle());
+            final Status status = resp.getStatus();
             final Code code = status.getCode();
             switch (code) {
                 case OK:
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 6434cca..427d318 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -72,6 +72,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -280,13 +281,14 @@ class ProducerImpl extends ClientImpl implements Producer {
             MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
         doBefore(messageHookPoints, messageCommons);
 
-        final ListenableFuture<EndTransactionResponse> future =
+        final ListenableFuture<InvocationContext<EndTransactionResponse>> future =
             clientManager.endTransaction(endpoints, metadata, request, requestTimeout);
-        Futures.addCallback(future, new FutureCallback<EndTransactionResponse>() {
+        Futures.addCallback(future, new FutureCallback<InvocationContext<EndTransactionResponse>>() {
             @Override
-            public void onSuccess(EndTransactionResponse result) {
+            public void onSuccess(InvocationContext<EndTransactionResponse> context) {
                 final Duration duration = stopwatch.elapsed();
-                final Status status = result.getStatus();
+                final EndTransactionResponse resp = context.getResp();
+                final Status status = resp.getStatus();
                 final Code code = status.getCode();
                 MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK :
                     MessageHookPointsStatus.ERROR;
@@ -299,8 +301,9 @@ class ProducerImpl extends ClientImpl implements Producer {
                 doAfter(messageHookPoints, messageCommons, duration, MessageHookPointsStatus.ERROR);
             }
         }, MoreExecutors.directExecutor());
-        final EndTransactionResponse response = handleClientFuture(future);
-        final Status status = response.getStatus();
+        final InvocationContext<EndTransactionResponse> context = handleClientFuture(future);
+        final EndTransactionResponse resp = context.getResp();
+        final Status status = resp.getStatus();
         final Code code = status.getCode();
         if (!Code.OK.equals(code)) {
             throw new ClientException(code.getNumber(), status.getMessage());
@@ -442,13 +445,13 @@ class ProducerImpl extends ClientImpl implements Producer {
         final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
         final SendMessageRequest request = wrapSendMessageRequest(messages);
 
-        final ListenableFuture<SendMessageResponse> responseFuture = clientManager.sendMessage(endpoints, metadata,
-            request, clientConfiguration.getRequestTimeout());
+        final ListenableFuture<InvocationContext<SendMessageResponse>> responseFuture =
+            clientManager.sendMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
 
         final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
             response -> {
                 final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
-                future0.set(SendReceiptImpl.processSendResponse(messageQueue, response));
+                future0.set(SendReceiptImpl.processSendResponse(messageQueue, response.getResp()));
                 return future0;
             }, MoreExecutors.directExecutor());
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
new file mode 100644
index 0000000..eb5f487
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.rpc;
+
+import com.google.common.base.MoreObjects;
+
+public class InvocationContext<T> {
+    private final T t;
+    private final RpcContext rpcContext;
+
+    public InvocationContext(T t, RpcContext rpcContext) {
+        this.t = t;
+        this.rpcContext = rpcContext;
+    }
+
+    public T getResp() {
+        return t;
+    }
+
+    public RpcContext getRpcContext() {
+        return rpcContext;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("resp", t)
+            .add("rpcContext", rpcContext)
+            .toString();
+    }
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 415d228..70744f3 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -72,9 +72,10 @@ public interface RpcClient {
      * @param request  query route request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of topic route.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata, QueryRouteRequest request, Executor executor,
+    ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Metadata metadata, QueryRouteRequest request,
+        Executor executor,
         Duration duration);
 
     /**
@@ -84,9 +85,10 @@ public interface RpcClient {
      * @param request  heart beat request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of heart beat.
+     * @return invocation of response future.
      */
-    ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata, HeartbeatRequest request, Executor executor,
+    ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request,
+        Executor executor,
         Duration duration);
 
     /**
@@ -96,10 +98,10 @@ public interface RpcClient {
      * @param request  send message request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of sending message.
+     * @return invocation of response future.
      */
-    ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata, SendMessageRequest request, Executor executor,
-        Duration duration);
+    ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Metadata metadata,
+        SendMessageRequest request, Executor executor, Duration duration);
 
     /**
      * Query assignment asynchronously.
@@ -108,10 +110,10 @@ public interface RpcClient {
      * @param request  query assignment request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of query assignment.
+     * @return invocation of response future.
      */
-    ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata metadata, QueryAssignmentRequest request,
-        Executor executor, Duration duration);
+    ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Metadata metadata,
+        QueryAssignmentRequest request, Executor executor, Duration duration);
 
     /**
      * Receiving message asynchronously from server.
@@ -119,9 +121,10 @@ public interface RpcClient {
      * @param metadata gRPC request header metadata.
      * @param request  receiving message request.
      * @param executor gRPC asynchronous executor.
+     * @return invocation of response future.
      */
-    ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata metadata, ReceiveMessageRequest request,
-        ExecutorService executor, Duration duration);
+    ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata,
+        ReceiveMessageRequest request, ExecutorService executor, Duration duration);
 
     /**
      * Ack message asynchronously after success of consumption.
@@ -130,10 +133,10 @@ public interface RpcClient {
      * @param request  ack message request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of ack message.
+     * @return invocation of response future.
      */
-    ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata, AckMessageRequest request, Executor executor,
-        Duration duration);
+    ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Metadata metadata, AckMessageRequest request,
+        Executor executor, Duration duration);
 
     /**
      * Change message invisible duration.
@@ -142,9 +145,9 @@ public interface RpcClient {
      * @param request  change invisible duration request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of change message invisible duration.
+     * @return invocation of response future.
      */
-    ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Metadata metadata,
+    ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Metadata metadata,
         ChangeInvisibleDurationRequest request, Executor executor, Duration duration);
 
     /**
@@ -154,9 +157,9 @@ public interface RpcClient {
      * @param request  request of sending message to DLQ.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of sending message to DLQ.
+     * @return invocation of response future.
      */
-    ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(
+    ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Executor executor, Duration duration);
 
     /**
@@ -166,10 +169,10 @@ public interface RpcClient {
      * @param request  end transaction request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of submitting transaction resolution.
+     * @return invocation of response future.
      */
-    ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata, EndTransactionRequest request,
-        Executor executor, Duration duration);
+    ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Metadata metadata,
+        EndTransactionRequest request, Executor executor, Duration duration);
 
     /**
      * Asynchronously notify server that client is terminated.
@@ -178,9 +181,9 @@ public interface RpcClient {
      * @param request  notify client termination request.
      * @param executor gRPC asynchronous executor.
      * @param duration request max duration.
-     * @return response future of notification of client termination.
+     * @return invocation of response future.
      */
-    ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(Metadata metadata,
+    ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(Metadata metadata,
         NotifyClientTerminationRequest request, Executor executor, Duration duration);
 
     StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor executor, Duration duration,
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index 15b94aa..ffba65e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -39,6 +39,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 import io.grpc.ClientInterceptor;
@@ -62,10 +63,12 @@ import java.util.concurrent.TimeUnit;
 import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
+@SuppressWarnings("UnstableApiUsage")
 public class RpcClientImpl implements RpcClient {
     private static final Duration KEEP_ALIVE_DURATION = Duration.ofSeconds(30);
     private static final int GRPC_MAX_MESSAGE_SIZE = Integer.MAX_VALUE;
 
+    private final Endpoints endpoints;
     private final ManagedChannel channel;
     private final MessagingServiceGrpc.MessagingServiceFutureStub futureStub;
     private final MessagingServiceGrpc.MessagingServiceBlockingStub blockingStub;
@@ -75,6 +78,7 @@ public class RpcClientImpl implements RpcClient {
 
     @SuppressWarnings("deprecation")
     public RpcClientImpl(Endpoints endpoints) throws SSLException {
+        this.endpoints = endpoints;
         final SslContextBuilder builder = GrpcSslContexts.forClient();
         builder.trustManager(InsecureTrustManagerFactory.INSTANCE);
         SslContext sslContext = builder.build();
@@ -102,6 +106,14 @@ public class RpcClientImpl implements RpcClient {
         this.activityNanoTime = System.nanoTime();
     }
 
+    private <T> ListenableFuture<InvocationContext<T>> wrapInvocationContext(ListenableFuture<T> future,
+        Metadata header) {
+        return Futures.transformAsync(future, response -> {
+            final RpcContext rpcContext = new RpcContext(endpoints, header);
+            return Futures.immediateFuture(new InvocationContext<>(response, rpcContext));
+        }, MoreExecutors.directExecutor());
+    }
+
     @Override
     public Duration idleDuration() {
         return Duration.ofNanos(System.nanoTime() - activityNanoTime);
@@ -113,85 +125,105 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<QueryRouteResponse> queryRoute(Metadata metadata, QueryRouteRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Metadata metadata,
+        QueryRouteRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+        final ListenableFuture<QueryRouteResponse> future = futureStub
+            .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryRoute(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<HeartbeatResponse> heartbeat(Metadata metadata, HeartbeatRequest request, Executor executor,
-        Duration duration) {
+    public ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request,
+        Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).heartbeat(request);
+        final ListenableFuture<HeartbeatResponse> future =
+            futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).heartbeat(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<SendMessageResponse> sendMessage(Metadata metadata, SendMessageRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Metadata metadata,
+        SendMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).sendMessage(request);
+        final ListenableFuture<SendMessageResponse> future =
+            futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).sendMessage(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<QueryAssignmentResponse> queryAssignment(Metadata metadata, QueryAssignmentRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Metadata metadata,
+        QueryAssignmentRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryAssignment(request);
+        final ListenableFuture<QueryAssignmentResponse> future =
+            futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).queryAssignment(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<Iterator<ReceiveMessageResponse>> receiveMessage(Metadata metadata,
+    public ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final Callable<Iterator<ReceiveMessageResponse>> callable = () -> blockingStub
             .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).receiveMessage(request);
-        return MoreExecutors.listeningDecorator(executor).submit(callable);
+        final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+            MoreExecutors.listeningDecorator(executor).submit(callable);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<AckMessageResponse> ackMessage(Metadata metadata, AckMessageRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Metadata metadata,
+        AckMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).ackMessage(request);
+        final ListenableFuture<AckMessageResponse> future =
+            futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).ackMessage(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<ChangeInvisibleDurationResponse> changeInvisibleDuration(Metadata metadata,
-        ChangeInvisibleDurationRequest request, Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
+        Metadata metadata, ChangeInvisibleDurationRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
+        final ListenableFuture<ChangeInvisibleDurationResponse> future =
+            futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).changeInvisibleDuration(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<ForwardMessageToDeadLetterQueueResponse> forwardMessageToDeadLetterQueue(
+    public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+        final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future = futureStub
+            .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
             .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).forwardMessageToDeadLetterQueue(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<EndTransactionResponse> endTransaction(Metadata metadata, EndTransactionRequest request,
-        Executor executor, Duration duration) {
+    public ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Metadata metadata,
+        EndTransactionRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).endTransaction(request);
+        final ListenableFuture<EndTransactionResponse> future =
+            futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).endTransaction(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
-    public ListenableFuture<NotifyClientTerminationResponse> notifyClientTermination(
+    public ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(
         Metadata metadata, NotifyClientTerminationRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
-        return futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
-            .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).notifyClientTermination(request);
+        final ListenableFuture<NotifyClientTerminationResponse> future =
+            futureStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata)).withExecutor(executor)
+                .withDeadlineAfter(duration.toNanos(), TimeUnit.NANOSECONDS).notifyClientTermination(request);
+        return wrapInvocationContext(future, metadata);
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
new file mode 100644
index 0000000..5def58b
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.rpc;
+
+import io.grpc.Metadata;
+import org.apache.rocketmq.client.java.route.Endpoints;
+
+public class RpcContext {
+    private final Endpoints endpoints;
+    private final Metadata header;
+
+    public RpcContext(Endpoints endpoints, Metadata header) {
+        this.endpoints = endpoints;
+        this.header = header;
+    }
+
+    public String getRequestId() {
+        return header.get(Metadata.Key.of(Signature.REQUEST_ID_KEY, Metadata.ASCII_STRING_MARSHALLER));
+    }
+
+    public Endpoints getEndpoints() {
+        return endpoints;
+    }
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
index 33b9f76..5c0a2cb 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Signature.java
@@ -53,7 +53,7 @@ public class Signature {
     private Signature() {
     }
 
-    public static Metadata sign(ClientConfiguration config, String clientId) throws UnsupportedEncodingException,
+    public static Metadata sign(ClientConfiguration config, String clientId) throws
         NoSuchAlgorithmException, InvalidKeyException {
         Metadata metadata = new Metadata();
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
index ca2f720..9b60aa6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/TLSHelper.java
@@ -31,11 +31,10 @@ public class TLSHelper {
     private TLSHelper() {
     }
 
-    public static String sign(String accessSecret, String dateTime) throws UnsupportedEncodingException,
-            NoSuchAlgorithmException,
-            InvalidKeyException {
+    public static String sign(String accessSecret, String dateTime) throws NoSuchAlgorithmException,
+        InvalidKeyException {
         SecretKeySpec signingKey = new SecretKeySpec(accessSecret.getBytes(StandardCharsets.UTF_8),
-                HMAC_SHA1_ALGORITHM);
+            HMAC_SHA1_ALGORITHM);
         Mac mac;
         mac = Mac.getInstance(HMAC_SHA1_ALGORITHM);
         mac.init(signingKey);
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 401ce38..5ef44bc 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -44,8 +44,10 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.RequestIdGenerator;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Before;
 import org.junit.Test;
@@ -117,7 +119,7 @@ public class ProcessQueueImplTest extends TestBase {
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(pushConsumerSettings.isFifo()).thenReturn(false);
         when(pushConsumer.changeInvisibleDuration(any(MessageViewImpl.class), any(Duration.class)))
-            .thenReturn(okChangeInvisibleDurationFuture());
+            .thenReturn(okChangeInvisibleDurationCtxFuture());
         processQueue.cacheMessages(messageViewList);
         verify(pushConsumer, times(1))
             .changeInvisibleDuration(any(MessageViewImpl.class), any(Duration.class));
@@ -148,7 +150,8 @@ public class ProcessQueueImplTest extends TestBase {
         List<MessageViewImpl> messageViewList = new ArrayList<>();
         final MessageViewImpl messageView = fakeMessageViewImpl();
         messageViewList.add(messageView);
-        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(fakeEndpoints(), status, messageViewList);
+        ReceiveMessageResult receiveMessageResult = new ReceiveMessageResult(fakeEndpoints(),
+            RequestIdGenerator.getInstance().next(), status, messageViewList);
         SettableFuture<ReceiveMessageResult> future0 = SettableFuture.create();
         future0.set(receiveMessageResult);
         when(pushConsumer.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class),
@@ -178,7 +181,7 @@ public class ProcessQueueImplTest extends TestBase {
         assertEquals(cachedMessageCount, processQueue.cachedMessagesCount());
         assertEquals(1, processQueue.inflightMessagesCount());
 
-        final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
+        final ListenableFuture<InvocationContext<AckMessageResponse>> future = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
         processQueue.eraseMessage(optionalMessageView.get(), ConsumeResult.SUCCESS);
         future.addListener(() -> verify(pushConsumer, times(1))
@@ -221,7 +224,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<AckMessageResponse> future0 = okAckMessageResponseFuture();
+        ListenableFuture<InvocationContext<AckMessageResponse>> future0 = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
@@ -237,7 +240,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future0 =
+        ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future0 =
             okForwardMessageToDeadLetterQueueResponseFuture();
         when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
@@ -254,7 +257,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<AckMessageResponse> future0 = okAckMessageResponseFuture();
+        ListenableFuture<InvocationContext<AckMessageResponse>> future0 = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(2);
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index 54e507c..a7b5357 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -124,7 +124,8 @@ public class PushConsumerImplTest extends TestBase {
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
             any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
-        Assert.assertEquals(okQueryAssignmentResponseFuture().get().getAssignmentsCount(), pushConsumer.getQueueSize());
+        Assert.assertEquals(okQueryAssignmentResponseFuture().get().getResp().getAssignmentsCount(),
+            pushConsumer.getQueueSize());
         when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
             any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
         pushConsumer.scanAssignments();
@@ -148,7 +149,8 @@ public class PushConsumerImplTest extends TestBase {
 
     @Test
     public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
+        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions,
+            messageListener,
             maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
         start(pushConsumer);
         final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 9568a81..88445d1 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.client.java.impl.TelemetrySession;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -87,7 +88,7 @@ public class SimpleConsumerImplTest extends TestBase {
     private SimpleConsumerImpl simpleConsumer;
 
     private void start(SimpleConsumerImpl simpleConsumer) throws ClientException {
-        SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
+        SettableFuture<InvocationContext<QueryRouteResponse>> future0 = SettableFuture.create();
         Status status = Status.newBuilder().setCode(Code.OK).build();
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
@@ -97,7 +98,9 @@ public class SimpleConsumerImplTest extends TestBase {
         messageQueueList.add(mq);
         QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
             .addAllMessageQueues(messageQueueList).build();
-        future0.set(response);
+        final InvocationContext<QueryRouteResponse> invocationContext = new InvocationContext<>(response,
+            fakeRpcContext());
+        future0.set(invocationContext);
         when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
             any(Duration.class)))
             .thenReturn(future0);
@@ -180,7 +183,7 @@ public class SimpleConsumerImplTest extends TestBase {
         simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
         start(simpleConsumer);
         int receivedMessageCount = 16;
-        final ListenableFuture<Iterator<ReceiveMessageResponse>> future =
+        final ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
             okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
         when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class),
             any(Duration.class))).thenReturn(future);
@@ -197,7 +200,7 @@ public class SimpleConsumerImplTest extends TestBase {
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<AckMessageResponse> future = okAckMessageResponseFuture();
+            final ListenableFuture<InvocationContext<AckMessageResponse>> future = okAckMessageResponseFuture();
             when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class),
                 any(Duration.class))).thenReturn(future);
             simpleConsumer.ack(messageView);
@@ -212,7 +215,7 @@ public class SimpleConsumerImplTest extends TestBase {
         start(simpleConsumer);
         try {
             final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<ChangeInvisibleDurationResponse> future =
+            final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
                 okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
             when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class),
                 any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 907a6fc..619538e 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -61,6 +61,7 @@ import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.TelemetrySession;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -94,7 +95,7 @@ public class ProducerImplTest extends TestBase {
         null);
 
     private void start(ProducerImpl producer) throws ClientException {
-        SettableFuture<QueryRouteResponse> future0 = SettableFuture.create();
+        SettableFuture<InvocationContext<QueryRouteResponse>> future0 = SettableFuture.create();
         Status status = Status.newBuilder().setCode(Code.OK).build();
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
@@ -102,9 +103,11 @@ public class ProducerImplTest extends TestBase {
             .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
             .setId(0).build();
         messageQueueList.add(mq);
-        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
+        QueryRouteResponse resp = QueryRouteResponse.newBuilder().setStatus(status)
             .addAllMessageQueues(messageQueueList).build();
-        future0.set(response);
+        final InvocationContext<QueryRouteResponse> invocationContext =
+            new InvocationContext<>(resp, fakeRpcContext());
+        future0.set(invocationContext);
         when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
             any(Duration.class)))
             .thenReturn(future0);
@@ -144,10 +147,11 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
             any(Duration.class), any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+            okSendMessageResponseFutureWithSingleEntry();
         when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get();
+        final SendMessageResponse response = future.get().getResp();
         assertEquals(1, response.getEntriesCount());
         final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
         final SendReceipt sendReceipt = producer.send(message);
@@ -163,10 +167,11 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
             any(TelemetrySession.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+            okSendMessageResponseFutureWithSingleEntry();
         when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get();
+        final SendMessageResponse response = future.get().getResp();
         assertEquals(1, response.getEntriesCount());
         final SendReceipt sendReceipt = producerWithoutTopicBinding.send(message);
         verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
@@ -185,7 +190,7 @@ public class ProducerImplTest extends TestBase {
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
             any(TelemetrySession.class));
-        final ListenableFuture<SendMessageResponse> future = failureSendMessageResponseFuture();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future = failureSendMessageResponseFuture();
         when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
         Message message0 = fakeMessage(FAKE_TOPIC_0);
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 212cb06..f3bcbb2 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -39,9 +39,11 @@ import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.SendResultEntry;
 import apache.rocketmq.v2.Status;
 import apache.rocketmq.v2.SystemProperties;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.protobuf.ByteString;
+import io.grpc.Metadata;
 import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
@@ -72,6 +74,8 @@ import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcContext;
 
 public class TestBase {
     protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -137,6 +141,10 @@ public class TestBase {
         return new Endpoints(fakePbEndpoints0());
     }
 
+    protected RpcContext fakeRpcContext() {
+        return new RpcContext(fakeEndpoints(), new Metadata());
+    }
+
     protected Message fakeMessage(String topic) {
         return new MessageBuilderImpl().setTopic(topic).setBody(RandomUtils.nextBytes(1)).build();
     }
@@ -207,40 +215,35 @@ public class TestBase {
             .setPermission(Permission.READ_WRITE).build();
     }
 
-    protected ListenableFuture<QueryRouteResponse> okQueryRouteResponseFuture() {
-        SettableFuture<QueryRouteResponse> future = SettableFuture.create();
+    protected ListenableFuture<InvocationContext<QueryRouteResponse>> okQueryRouteResponseFuture() {
         Status status = Status.newBuilder().setCode(Code.OK).build();
-        final QueryRouteResponse response =
+        final QueryRouteResponse resp =
             QueryRouteResponse.newBuilder().setStatus(status).addMessageQueues(fakePbMessageQueue0()).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
-    protected ListenableFuture<ChangeInvisibleDurationResponse> okChangeInvisibleDurationFuture() {
-        SettableFuture<ChangeInvisibleDurationResponse> future = SettableFuture.create();
+    protected ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
+        okChangeInvisibleDurationCtxFuture() {
         Status status = Status.newBuilder().setCode(Code.OK).build();
-        final ChangeInvisibleDurationResponse response =
+        final ChangeInvisibleDurationResponse resp =
             ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
-    protected ListenableFuture<QueryAssignmentResponse> okQueryAssignmentResponseFuture() {
+    protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> okQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         Assignment assignment = Assignment.newBuilder().setMessageQueue(fakePbMessageQueue0()).build();
-        QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status)
+        QueryAssignmentResponse resp = QueryAssignmentResponse.newBuilder().setStatus(status)
             .addAssignments(assignment).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
-    protected ListenableFuture<QueryAssignmentResponse> okEmptyQueryAssignmentResponseFuture() {
+    protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> okEmptyQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        final QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status).build();
-        future.set(response);
-        return future;
+        final QueryAssignmentResponse resp = QueryAssignmentResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
     protected Map<String, FilterExpression> createSubscriptionExpressions(String topic) {
@@ -250,53 +253,44 @@ public class TestBase {
         return map;
     }
 
-    protected ListenableFuture<AckMessageResponse> okAckMessageResponseFuture() {
+    protected ListenableFuture<InvocationContext<AckMessageResponse>> okAckMessageResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<AckMessageResponse> future0 = SettableFuture.create();
-        final AckMessageResponse response = AckMessageResponse.newBuilder().setStatus(status).build();
-        future0.set(response);
-        return future0;
+        final AckMessageResponse resp = AckMessageResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
-    protected ListenableFuture<ChangeInvisibleDurationResponse> okChangeInvisibleDurationResponseFuture(
-        String receiptHandle) {
+    protected ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
+        okChangeInvisibleDurationResponseFuture(String receiptHandle) {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         SettableFuture<ChangeInvisibleDurationResponse> future = SettableFuture.create();
-        ChangeInvisibleDurationResponse response = ChangeInvisibleDurationResponse.newBuilder().setStatus(status)
+        ChangeInvisibleDurationResponse resp = ChangeInvisibleDurationResponse.newBuilder().setStatus(status)
             .setReceiptHandle(receiptHandle).build();
-        future.set(response);
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
-    protected ListenableFuture<ForwardMessageToDeadLetterQueueResponse>
-        okForwardMessageToDeadLetterQueueResponseFuture() {
+    protected ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
+    okForwardMessageToDeadLetterQueueResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<ForwardMessageToDeadLetterQueueResponse> future0 = SettableFuture.create();
-        final ForwardMessageToDeadLetterQueueResponse response =
+        final ForwardMessageToDeadLetterQueueResponse resp =
             ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
-        future0.set(response);
-        return future0;
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
-    protected ListenableFuture<SendMessageResponse> okSendMessageResponseFutureWithSingleEntry() {
+    protected ListenableFuture<InvocationContext<SendMessageResponse>> okSendMessageResponseFutureWithSingleEntry() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
         final String messageId = MessageIdCodec.getInstance().nextMessageId().toString();
         SendResultEntry entry = SendResultEntry.newBuilder().setMessageId(messageId)
             .setTransactionId(FAKE_TRANSACTION_ID).setStatus(status).setOffset(1).build();
-        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
-        future0.set(response);
-        return future0;
+        SendMessageResponse resp = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
-    protected ListenableFuture<SendMessageResponse> failureSendMessageResponseFuture() {
+    protected ListenableFuture<InvocationContext<SendMessageResponse>> failureSendMessageResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.FORBIDDEN).build();
-        SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
         SendResultEntry sendResultEntry = SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
-        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status)
+        SendMessageResponse resp = SendMessageResponse.newBuilder().setStatus(status)
             .addEntries(sendResultEntry).build();
-        future0.set(response);
-        return future0;
+        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
     }
 
     protected ListenableFuture<SendMessageResponse> okBatchSendMessageResponseFuture() {
@@ -326,11 +320,9 @@ public class TestBase {
             .setSystemProperties(systemProperties).build();
     }
 
-    protected ListenableFuture<Iterator<ReceiveMessageResponse>> okReceiveMessageResponsesFuture(String topic,
-        int messageCount) {
+    protected ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> okReceiveMessageResponsesFuture(
+        String topic, int messageCount) {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<Iterator<ReceiveMessageResponse>> future = SettableFuture.create();
-
         final apache.rocketmq.v2.Message message = fakePbMessage(topic);
         List<ReceiveMessageResponse> responses = new ArrayList<>();
         ReceiveMessageResponse statusResponse = ReceiveMessageResponse.newBuilder().setStatus(status).build();
@@ -339,9 +331,7 @@ public class TestBase {
             ReceiveMessageResponse messageResponse = ReceiveMessageResponse.newBuilder().setMessage(message).build();
             responses.add(messageResponse);
         }
-
-        future.set(responses.iterator());
-        return future;
+        return Futures.immediateFuture(new InvocationContext<>(responses.iterator(), fakeRpcContext()));
     }
 
     protected ListenableFuture<EndTransactionResponse> okEndTransactionResponseFuture() {
@@ -369,8 +359,9 @@ public class TestBase {
 
     protected SendReceiptImpl fakeSendReceiptImpl(
         MessageQueueImpl mq) throws ExecutionException, InterruptedException, ClientException {
-        final ListenableFuture<SendMessageResponse> future = okSendMessageResponseFutureWithSingleEntry();
-        final SendMessageResponse response = future.get();
+        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+            okSendMessageResponseFutureWithSingleEntry();
+        final SendMessageResponse response = future.get().getResp();
         final List<SendReceiptImpl> receipts = SendReceiptImpl.processSendResponse(mq, response);
         return receipts.iterator().next();
     }