You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/18 10:54:43 UTC

[rocketmq-clients] branch master updated: Java: use independent clientManager for each client (#55)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new e20e51a  Java: use independent clientManager for each client (#55)
e20e51a is described below

commit e20e51a2ce0fe1588dd5aa70b54a58019f906ce6
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Mon Jul 18 18:54:38 2022 +0800

    Java: use independent clientManager for each client (#55)
    
    * Add more test
    
    * Bugfix: forget to update ProducerSettings#topics
    
    * Check maxMessageNum before receiving message
    
    * Polish code
---
 .../rocketmq/client/java/impl/ClientImpl.java      |  31 +-
 .../rocketmq/client/java/impl/ClientManager.java   |  73 ++-
 .../client/java/impl/ClientManagerImpl.java        |  85 +---
 .../client/java/impl/ClientManagerRegistry.java    |  95 ----
 .../client/java/impl/consumer/ConsumerImpl.java    |  32 +-
 .../java/impl/consumer/ProcessQueueImpl.java       |  53 +--
 .../java/impl/consumer/PushConsumerImpl.java       |  79 +++-
 .../java/impl/consumer/SimpleConsumerImpl.java     |  82 ++--
 .../client/java/impl/producer/ProducerImpl.java    |  24 +-
 .../java/impl/producer/ProducerSettings.java       |  15 +-
 .../client/java/impl/producer/SendReceiptImpl.java |  12 +-
 .../client/java/route/TopicRouteDataResult.java    |  12 +-
 .../java/rpc/{RpcContext.java => Context.java}     |   4 +-
 .../apache/rocketmq/client/java/rpc/RpcClient.java |  20 +-
 .../rocketmq/client/java/rpc/RpcClientImpl.java    |  27 +-
 .../{InvocationContext.java => RpcInvocation.java} |  16 +-
 .../client/java/impl/ClientManagerImplTest.java    |   2 +-
 .../java/impl/consumer/ProcessQueueImplTest.java   |  10 +-
 .../java/impl/consumer/PushConsumerImplTest.java   | 149 ++----
 .../java/impl/consumer/SimpleConsumerImplTest.java | 515 +++++++++++++++------
 .../java/impl/producer/ProducerImplTest.java       |  26 +-
 .../apache/rocketmq/client/java/tool/TestBase.java |  86 ++--
 java/pom.xml                                       |   2 +-
 23 files changed, 749 insertions(+), 701 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 044b6aa..2367265 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -83,7 +83,7 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.apache.rocketmq.client.java.rpc.Signature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -145,6 +145,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         this.messageInterceptors = new ArrayList<>();
         this.messageInterceptorsLock = new ReentrantReadWriteLock();
 
+        this.clientManager = new ClientManagerImpl(this);
+
         this.clientCallbackExecutor = new ThreadPoolExecutor(
             Runtime.getRuntime().availableProcessors(),
             Runtime.getRuntime().availableProcessors(),
@@ -174,8 +176,6 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     @Override
     protected void startUp() throws Exception {
         LOGGER.info("Begin to start the rocketmq client, clientId={}", clientId);
-        // Register client after client id generation.
-        this.clientManager = ClientManagerRegistry.getInstance().registerClient(this);
         // Fetch topic route from remote.
         LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}",
             clientId, topics);
@@ -228,7 +228,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         LOGGER.info("Begin to release telemetry sessions, clientId={}", clientId);
         releaseTelemetrySessions();
         LOGGER.info("Release telemetry sessions successfully, clientId={}", clientId);
-        ClientManagerRegistry.getInstance().unregisterClient(this);
+        clientManager.stopAsync().awaitTerminated();
         clientCallbackExecutor.shutdown();
         if (!ExecutorServices.awaitTerminated(clientCallbackExecutor)) {
             LOGGER.error("[Bug] Timeout to shutdown the client callback executor, clientId={}", clientId);
@@ -595,7 +595,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     }
 
     /**
-     * Send heartbeat data to appointed endpoint
+     * Send heartbeat data to the appointed endpoint
      *
      * @param request   heartbeat data request
      * @param endpoints endpoint to send heartbeat data
@@ -603,12 +603,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     private void doHeartbeat(HeartbeatRequest request, final Endpoints endpoints) {
         try {
             Metadata metadata = sign();
-            final ListenableFuture<InvocationContext<HeartbeatResponse>> future = clientManager
+            final ListenableFuture<RpcInvocation<HeartbeatResponse>> future = clientManager
                 .heartbeat(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
-            Futures.addCallback(future, new FutureCallback<InvocationContext<HeartbeatResponse>>() {
+            Futures.addCallback(future, new FutureCallback<RpcInvocation<HeartbeatResponse>>() {
                 @Override
-                public void onSuccess(InvocationContext<HeartbeatResponse> context) {
-                    final HeartbeatResponse response = context.getResp();
+                public void onSuccess(RpcInvocation<HeartbeatResponse> inv) {
+                    final HeartbeatResponse response = inv.getResponse();
                     final Status status = response.getStatus();
                     final Code code = status.getCode();
                     if (Code.OK != code) {
@@ -653,18 +653,19 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
                 .setEndpoints(endpoints.toProtobuf()).build();
             final Metadata metadata = sign();
-            final ListenableFuture<InvocationContext<QueryRouteResponse>> contextFuture =
+            final ListenableFuture<RpcInvocation<QueryRouteResponse>> future =
                 clientManager.queryRoute(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
-            return Futures.transform(contextFuture, ctx -> {
-                final QueryRouteResponse response = ctx.getResp();
+            return Futures.transform(future, invocation -> {
+                final QueryRouteResponse response = invocation.getResponse();
+                final String requestId = invocation.getContext().getRequestId();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.OK != code) {
                     LOGGER.error("Exception raised while fetch topic route from remote, topic={}, " +
-                            "clientId={}, endpoints={}, code={}, status message=[{}]", topic, clientId,
-                        endpoints, code, status.getMessage());
+                            "clientId={}, requestId={}, endpoints={}, code={}, status message=[{}]", topic, clientId,
+                        requestId, endpoints, code, status.getMessage());
                 }
-                return new TopicRouteDataResult(ctx);
+                return new TopicRouteDataResult(invocation);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
             return Futures.immediateFailedFuture(t);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
index b61e315..060f0d7 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManager.java
@@ -38,6 +38,7 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
+import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
@@ -46,7 +47,7 @@ import java.util.Iterator;
 import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 
 /**
  * Client manager supplies a series of unified APIs to execute remote procedure calls for each {@link Client}.
@@ -55,34 +56,13 @@ import org.apache.rocketmq.client.java.rpc.InvocationContext;
  * once {@link Client} is shut down, it must be unregistered by the client manager. The client manager holds the
  * connections and underlying threads, which are shared by all registered clients.
  */
-public interface ClientManager {
-    /**
-     * Register client.
-     *
-     * @param client client.
-     */
-    void registerClient(Client client);
-
-    /**
-     * Unregister client.
-     *
-     * @param client client.
-     */
-    void unregisterClient(Client client);
-
-    /**
-     * Returns {@code true} if manager contains no {@link Client}.
-     *
-     * @return {@code true} if this map contains no {@link Client}.
-     */
-    boolean isEmpty();
-
+public abstract class ClientManager extends AbstractIdleService {
     /**
      * Provide for the client to share the scheduler.
      *
      * @return shared scheduler.
      */
-    ScheduledExecutorService getScheduler();
+    public abstract ScheduledExecutorService getScheduler();
 
     /**
      * Query topic route asynchronously, the method ensures no throwable.
@@ -93,9 +73,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata,
-        QueryRouteRequest request,
-        Duration duration);
+    public abstract ListenableFuture<RpcInvocation<QueryRouteResponse>> queryRoute(Endpoints endpoints,
+        Metadata metadata, QueryRouteRequest request, Duration duration);
 
     /**
      * Heart beat asynchronously, the method ensures no throwable.
@@ -106,9 +85,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata,
-        HeartbeatRequest request,
-        Duration duration);
+    public abstract ListenableFuture<RpcInvocation<HeartbeatResponse>> heartbeat(Endpoints endpoints,
+        Metadata metadata, HeartbeatRequest request, Duration duration);
 
     /**
      * Send message asynchronously, the method ensures no throwable.
@@ -119,8 +97,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata,
-        SendMessageRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<SendMessageResponse>> sendMessage(Endpoints endpoints,
+        Metadata metadata, SendMessageRequest request, Duration duration);
 
     /**
      * Query assignment asynchronously, the method ensures no throwable.
@@ -131,8 +109,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints, Metadata metadata,
-        QueryAssignmentRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints,
+        Metadata metadata, QueryAssignmentRequest request, Duration duration);
 
     /**
      * Receiving messages asynchronously from the server, the method ensures no throwable.
@@ -141,8 +119,8 @@ public interface ClientManager {
      * @param metadata  gRPC request header metadata.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints,
-        Metadata metadata, ReceiveMessageRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> receiveMessage(
+        Endpoints endpoints, Metadata metadata, ReceiveMessageRequest request, Duration duration);
 
     /**
      * Ack message asynchronously after the success of consumption, the method ensures no throwable.
@@ -153,8 +131,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata,
-        AckMessageRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<AckMessageResponse>> ackMessage(Endpoints endpoints,
+        Metadata metadata, AckMessageRequest request, Duration duration);
 
     /**
      * Nack message asynchronously after the failure of consumption, the method ensures no throwable.
@@ -165,8 +143,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Endpoints endpoints,
-        Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
+        Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration);
 
     /**
      * Send a message to the dead letter queue asynchronously, the method ensures no throwable.
@@ -177,8 +155,9 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
-        Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>
+    forwardMessageToDeadLetterQueue(Endpoints endpoints, Metadata metadata,
+        ForwardMessageToDeadLetterQueueRequest request, Duration duration);
 
     /**
      * Submit transaction resolution asynchronously, the method ensures no throwable.
@@ -189,8 +168,8 @@ public interface ClientManager {
      * @param duration  request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints, Metadata metadata,
-        EndTransactionRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<EndTransactionResponse>> endTransaction(Endpoints endpoints,
+        Metadata metadata, EndTransactionRequest request, Duration duration);
 
     /**
      * Asynchronously notify the server that client is terminated, the method ensures no throwable.
@@ -202,8 +181,8 @@ public interface ClientManager {
      * @return response future of notification of client termination.
      */
     @SuppressWarnings("UnusedReturnValue")
-    ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(Endpoints endpoints,
-        Metadata metadata, NotifyClientTerminationRequest request, Duration duration);
+    public abstract ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> notifyClientTermination(
+        Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest request, Duration duration);
 
     /**
      * Establish telemetry session stream to server.
@@ -215,6 +194,6 @@ public interface ClientManager {
      * @return request observer.
      * @throws ClientException if failed to establish telemetry session stream.
      */
-    StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata metadata,
+    public abstract StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, Metadata metadata,
         Duration duration, StreamObserver<TelemetryCommand> responseObserver) throws ClientException;
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index b564634..a070aa5 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -38,7 +38,6 @@ import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.SendMessageRequest;
 import apache.rocketmq.v2.SendMessageResponse;
 import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
@@ -49,8 +48,6 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
@@ -62,19 +59,18 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.net.ssl.SSLException;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.misc.ExecutorServices;
-import org.apache.rocketmq.client.java.misc.MetadataUtils;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
 import org.apache.rocketmq.client.java.rpc.RpcClient;
 import org.apache.rocketmq.client.java.rpc.RpcClientImpl;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * @see ClientManager
  */
-public class ClientManagerImpl extends AbstractIdleService implements ClientManager {
+public class ClientManagerImpl extends ClientManager {
     public static final Duration RPC_CLIENT_MAX_IDLE_DURATION = Duration.ofMinutes(30);
 
     public static final Duration RPC_CLIENT_IDLE_CHECK_INITIAL_DELAY = Duration.ofSeconds(5);
@@ -91,15 +87,11 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientManagerImpl.class);
 
+    private final Client client;
     @GuardedBy("rpcClientTableLock")
     private final Map<Endpoints, RpcClient> rpcClientTable;
     private final ReadWriteLock rpcClientTableLock;
 
-    /**
-     * Contains all client, key is {@link ClientImpl#clientId}.
-     */
-    private final ConcurrentMap<String, Client> clientTable;
-
     /**
      * In charge of all scheduled tasks.
      */
@@ -110,12 +102,10 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
      */
     private final ExecutorService asyncWorker;
 
-    public ClientManagerImpl() {
+    public ClientManagerImpl(Client client) {
+        this.client = client;
         this.rpcClientTable = new HashMap<>();
         this.rpcClientTableLock = new ReentrantReadWriteLock();
-
-        this.clientTable = new ConcurrentHashMap<>();
-
         this.scheduler = new ScheduledThreadPoolExecutor(
             Runtime.getRuntime().availableProcessors(),
             new ThreadFactoryImpl("ClientScheduler"));
@@ -129,21 +119,6 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
             new ThreadFactoryImpl("ClientAsyncWorker"));
     }
 
-    @Override
-    public void registerClient(Client client) {
-        clientTable.put(client.clientId(), client);
-    }
-
-    @Override
-    public void unregisterClient(Client client) {
-        clientTable.remove(client.clientId());
-    }
-
-    @Override
-    public boolean isEmpty() {
-        return clientTable.isEmpty();
-    }
-
     /**
      * It is well-founded that a {@link RpcClient} is deprecated if it is idle for a long time, so it is essential to
      * clear it.
@@ -172,30 +147,6 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
         }
     }
 
-    private void doHeartbeat() {
-        for (Client client : clientTable.values()) {
-            client.doHeartbeat();
-        }
-    }
-
-    private void doStats() {
-        LOGGER.info("Start to log stats for a new round, clientVersion={}, clientWrapperVersion={}",
-            MetadataUtils.getVersion(), MetadataUtils.getWrapperVersion());
-        for (Client client : clientTable.values()) {
-            client.doStats();
-        }
-    }
-
-    private void syncSettings() {
-        clientTable.values().forEach(client -> {
-            try {
-                client.syncSettings();
-            } catch (Throwable t) {
-                LOGGER.error("Failed to announce settings, clientId={}", client.clientId(), t);
-            }
-        });
-    }
-
     /**
      * Return the RPC client by remote {@link Endpoints}, would create the client automatically if it does not exist.
      *
@@ -236,7 +187,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<RpcInvocation<QueryRouteResponse>> queryRoute(Endpoints endpoints, Metadata metadata,
         QueryRouteRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -247,7 +198,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<RpcInvocation<HeartbeatResponse>> heartbeat(Endpoints endpoints, Metadata metadata,
         HeartbeatRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -258,7 +209,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<RpcInvocation<SendMessageResponse>> sendMessage(Endpoints endpoints, Metadata metadata,
         SendMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -269,7 +220,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints,
+    public ListenableFuture<RpcInvocation<QueryAssignmentResponse>> queryAssignment(Endpoints endpoints,
         Metadata metadata, QueryAssignmentRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -280,7 +231,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints,
+    public ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> receiveMessage(Endpoints endpoints,
         Metadata metadata, ReceiveMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -291,7 +242,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata,
+    public ListenableFuture<RpcInvocation<AckMessageResponse>> ackMessage(Endpoints endpoints, Metadata metadata,
         AckMessageRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -302,7 +253,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
+    public ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
         Endpoints endpoints, Metadata metadata, ChangeInvisibleDurationRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -313,7 +264,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
+    public ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         Endpoints endpoints, Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -324,7 +275,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Endpoints endpoints,
+    public ListenableFuture<RpcInvocation<EndTransactionResponse>> endTransaction(Endpoints endpoints,
         Metadata metadata, EndTransactionRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -335,7 +286,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     }
 
     @Override
-    public ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(
+    public ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> notifyClientTermination(
         Endpoints endpoints, Metadata metadata, NotifyClientTerminationRequest request, Duration duration) {
         try {
             final RpcClient rpcClient = getRpcClient(endpoints);
@@ -376,7 +327,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
-                    doHeartbeat();
+                    client.doHeartbeat();
                 } catch (Throwable t) {
                     LOGGER.error("Exception raised while heartbeat.", t);
                 }
@@ -389,7 +340,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
-                    doStats();
+                    client.doStats();
                 } catch (Throwable t) {
                     LOGGER.error("Exception raised while log stats.", t);
                 }
@@ -402,7 +353,7 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
         scheduler.scheduleWithFixedDelay(
             () -> {
                 try {
-                    syncSettings();
+                    client.syncSettings();
                 } catch (Throwable t) {
                     LOGGER.error("Exception raised during the setting announcement.", t);
                 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
deleted file mode 100644
index 80299da..0000000
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.impl;
-
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-import javax.annotation.concurrent.ThreadSafe;
-
-@ThreadSafe
-public class ClientManagerRegistry {
-    private static final ClientManagerRegistry INSTANCE = new ClientManagerRegistry();
-
-    @GuardedBy("clientIdsLock")
-    private final Set<String> clientIds = new HashSet<>();
-    private final Lock clientIdsLock = new ReentrantLock();
-
-    private volatile ClientManagerImpl singletonClientManager = null;
-
-    private ClientManagerRegistry() {
-    }
-
-    public static ClientManagerRegistry getInstance() {
-        return INSTANCE;
-    }
-
-    /**
-     * Register {@link Client} to the appointed manager by manager id, start the manager if it is created newly.
-     *
-     * <p>Different clients would share the same {@link ClientManager} if they have the same manager id.
-     *
-     * @param client the client to register.
-     * @return the client manager which is started.
-     */
-    public ClientManager registerClient(Client client) {
-        clientIdsLock.lock();
-        try {
-            if (null == singletonClientManager) {
-                final ClientManagerImpl clientManager = new ClientManagerImpl();
-                clientManager.startAsync().awaitRunning();
-                singletonClientManager = clientManager;
-            }
-            clientIds.add(client.clientId());
-            singletonClientManager.registerClient(client);
-            return singletonClientManager;
-        } finally {
-            clientIdsLock.unlock();
-        }
-    }
-
-    /**
-     * Unregister {@link Client} to the appointed manager by message-id, shutdown the manager if no client
-     * registered in it.
-     *
-     * @param client client to unregister.
-     * @return {@link ClientManager} is removed or not.
-     */
-    @SuppressWarnings("UnusedReturnValue")
-    public boolean unregisterClient(Client client) {
-        ClientManagerImpl clientManager = null;
-        clientIdsLock.lock();
-        try {
-            clientIds.remove(client.clientId());
-            singletonClientManager.unregisterClient(client);
-            if (clientIds.isEmpty()) {
-                clientManager = singletonClientManager;
-                singletonClientManager = null;
-            }
-        } finally {
-            clientIdsLock.unlock();
-        }
-        // No need to hold the lock here.
-        if (null != clientManager) {
-            clientManager.stopAsync().awaitTerminated();
-        }
-        return null != clientManager;
-    }
-}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index 2f50f2a..89687f8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -57,7 +57,7 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -80,11 +80,11 @@ abstract class ConsumerImpl extends ClientImpl {
         try {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
-            final ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
+            final ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
                 clientManager.receiveMessage(endpoints,
                     metadata, request, timeout);
             return Futures.transform(future, context -> {
-                final Iterator<ReceiveMessageResponse> it = context.getResp();
+                final Iterator<ReceiveMessageResponse> it = context.getResponse();
                 Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
                     .setMessage("status was not set by server")
                     .build();
@@ -111,7 +111,7 @@ abstract class ConsumerImpl extends ClientImpl {
                     final MessageViewImpl view = MessageViewImpl.fromProtobuf(message, mq, deliveryTimestampFromRemote);
                     messages.add(view);
                 }
-                return new ReceiveMessageResult(endpoints, context.getRpcContext().getRequestId(), status, messages);
+                return new ReceiveMessageResult(endpoints, context.getContext().getRequestId(), status, messages);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
             return Futures.immediateFailedFuture(t);
@@ -138,9 +138,9 @@ abstract class ConsumerImpl extends ClientImpl {
 
     }
 
-    public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(MessageViewImpl messageView) {
+    protected ListenableFuture<RpcInvocation<AckMessageResponse>> ackMessage(MessageViewImpl messageView) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<InvocationContext<AckMessageResponse>> future;
+        ListenableFuture<RpcInvocation<AckMessageResponse>> future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = Collections.singletonList(messageView.getMessageCommon());
@@ -150,14 +150,14 @@ abstract class ConsumerImpl extends ClientImpl {
             final Metadata metadata = sign();
             future = clientManager.ackMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<InvocationContext<AckMessageResponse>> future0 = SettableFuture.create();
+            final SettableFuture<RpcInvocation<AckMessageResponse>> future0 = SettableFuture.create();
             future0.setException(t);
             future = future0;
         }
-        Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() {
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<AckMessageResponse>>() {
             @Override
-            public void onSuccess(InvocationContext<AckMessageResponse> context) {
-                final AckMessageResponse response = context.getResp();
+            public void onSuccess(RpcInvocation<AckMessageResponse> invocation) {
+                final AckMessageResponse response = invocation.getResponse();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
@@ -175,10 +175,10 @@ abstract class ConsumerImpl extends ClientImpl {
         return future;
     }
 
-    public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
+    public ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
         MessageViewImpl messageView, Duration invisibleDuration) {
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future;
+        ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> future;
 
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final List<MessageCommon> messageCommons = Collections.singletonList(messageView.getMessageCommon());
@@ -189,15 +189,15 @@ abstract class ConsumerImpl extends ClientImpl {
             future = clientManager.changeInvisibleDuration(endpoints, metadata, request,
                 clientConfiguration.getRequestTimeout());
         } catch (Throwable t) {
-            final SettableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future0 = SettableFuture.create();
+            final SettableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> future0 = SettableFuture.create();
             future0.setException(t);
             future = future0;
         }
         final MessageId messageId = messageView.getMessageId();
-        Futures.addCallback(future, new FutureCallback<InvocationContext<ChangeInvisibleDurationResponse>>() {
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() {
             @Override
-            public void onSuccess(InvocationContext<ChangeInvisibleDurationResponse> context) {
-                final ChangeInvisibleDurationResponse response = context.getResp();
+            public void onSuccess(RpcInvocation<ChangeInvisibleDurationResponse> invocation) {
+                final ChangeInvisibleDurationResponse response = invocation.getResponse();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 final Duration duration = stopwatch.elapsed();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index dd4cd17..ccd0899 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -52,7 +52,7 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -282,16 +282,16 @@ class ProcessQueueImpl implements ProcessQueue {
         final long actualMessagesQuantity = this.cachedMessagesCount();
         if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
             LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}," +
-                    " mq={}, clientId={}",
-                cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.clientId());
+                    " mq={}, clientId={}", cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq,
+                consumer.clientId());
             return true;
         }
         final int cacheMessageBytesThresholdPerQueue = consumer.cacheMessageBytesThresholdPerQueue();
         final long actualCachedMessagesBytes = this.cachedMessageBytes();
         if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
             LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes," +
-                    " actual={} bytes, mq={}, clientId={}",
-                cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.clientId());
+                    " actual={} bytes, mq={}, clientId={}", cacheMessageBytesThresholdPerQueue,
+                actualCachedMessagesBytes, mq, consumer.clientId());
             return true;
         }
         return false;
@@ -389,21 +389,22 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<InvocationContext<AckMessageResponse>> future = consumer.ackMessage(messageView);
-        Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() {
+        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = consumer.ackMessage(messageView);
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<AckMessageResponse>>() {
             @Override
-            public void onSuccess(InvocationContext<AckMessageResponse> context) {
-                final AckMessageResponse resp = context.getResp();
-                final Status status = resp.getStatus();
+            public void onSuccess(RpcInvocation<AckMessageResponse> invocation) {
+                final String requestId = invocation.getContext().getRequestId();
+                final AckMessageResponse response = invocation.getResponse();
+                final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.OK.equals(code)) {
                     LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
-                        + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
+                        + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
                     return;
                 }
                 LOGGER.error("Failed to ack message, clientId={}, consumerGroup={}, messageId={}, mq={}, "
-                        + "endpoints={}, code={}, status message={}", clientId, consumerGroup, messageId, mq,
-                    endpoints, code, status.getMessage());
+                        + "endpoints={}, requestId={}, code={}, status message={}", clientId, consumerGroup, messageId,
+                    mq, endpoints, requestId, code, status.getMessage());
             }
 
             @Override
@@ -482,18 +483,18 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void forwardToDeadLetterQueue(final MessageViewImpl messageView, final int attempt,
         final SettableFuture<Void> future0) {
-        final ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future =
+        final ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> future =
             consumer.forwardMessageToDeadLetterQueue(messageView);
         final String clientId = consumer.clientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        Futures.addCallback(future, new FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>() {
             @Override
-            public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
-                final ForwardMessageToDeadLetterQueueResponse resp = context.getResp();
-                final String requestId = context.getRpcContext().getRequestId();
-                final Status status = resp.getStatus();
+            public void onSuccess(RpcInvocation<ForwardMessageToDeadLetterQueueResponse> invocation) {
+                final ForwardMessageToDeadLetterQueueResponse response = invocation.getResponse();
+                final String requestId = invocation.getContext().getRequestId();
+                final Status status = response.getStatus();
                 final Code code = status.getCode();
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
@@ -566,19 +567,19 @@ class ProcessQueueImpl implements ProcessQueue {
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<InvocationContext<AckMessageResponse>> future = consumer.ackMessage(messageView);
-        Futures.addCallback(future, new FutureCallback<InvocationContext<AckMessageResponse>>() {
+        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = consumer.ackMessage(messageView);
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<AckMessageResponse>>() {
             @Override
-            public void onSuccess(InvocationContext<AckMessageResponse> context) {
-                final AckMessageResponse resp = context.getResp();
-                final String requestId = context.getRpcContext().getRequestId();
-                final Status status = resp.getStatus();
+            public void onSuccess(RpcInvocation<AckMessageResponse> invocation) {
+                final AckMessageResponse response = invocation.getResponse();
+                final String requestId = invocation.getContext().getRequestId();
+                final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
                     LOGGER.error("Failed to ack fifo message due to the invalid receipt handle, forgive to retry, "
                             + "clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
                             + "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
-                        endpoints, context.getRpcContext().getRequestId(), status.getMessage());
+                        endpoints, requestId, status.getMessage());
                     future0.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
                     return;
                 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index f0cd53a..2c18a92 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -27,12 +27,12 @@ import apache.rocketmq.v2.Status;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
 import apache.rocketmq.v2.VerifyMessageResult;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
 import io.grpc.Metadata;
 import java.time.Duration;
 import java.util.Collections;
@@ -58,6 +58,13 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.impl.ClientSettings;
@@ -71,7 +78,7 @@ import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -263,30 +270,49 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             .setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
     }
 
-    private ListenableFuture<Assignments> queryAssignment(final String topic) {
+    ListenableFuture<Assignments> queryAssignment(final String topic) {
         final ListenableFuture<Endpoints> future = pickEndpointsToQueryAssignments(topic);
-        final ListenableFuture<InvocationContext<QueryAssignmentResponse>> responseFuture =
+        final ListenableFuture<RpcInvocation<QueryAssignmentResponse>> responseFuture =
             Futures.transformAsync(future, endpoints -> {
                 final Metadata metadata = sign();
                 final QueryAssignmentRequest request = wrapQueryAssignmentRequest(topic);
-                return clientManager.queryAssignment(endpoints, metadata, request,
-                    clientConfiguration.getRequestTimeout());
+                final Duration requestTimeout = clientConfiguration.getRequestTimeout();
+                return clientManager.queryAssignment(endpoints, metadata, request, requestTimeout);
             }, MoreExecutors.directExecutor());
         return Futures.transformAsync(responseFuture, context -> {
-            final QueryAssignmentResponse resp = context.getResp();
-            final Status status = resp.getStatus();
+            final QueryAssignmentResponse response = context.getResponse();
+            final Status status = response.getStatus();
             final Code code = status.getCode();
-            if (!Code.OK.equals(code)) {
-                final String message = String.format("Failed to query assignment, code=%d, status message=[{%s}]",
-                    code.getNumber(), status.getMessage());
-                throw new RuntimeException(message);
+            final int codeNumber = code.getNumber();
+            final String requestId = context.getContext().getRequestId();
+            final String statusMessage = status.getMessage();
+            switch (code) {
+                case OK:
+                    break;
+                case BAD_REQUEST:
+                case ILLEGAL_ACCESS_POINT:
+                case ILLEGAL_TOPIC:
+                case CLIENT_ID_REQUIRED:
+                    throw new BadRequestException(codeNumber, requestId, statusMessage);
+                case FORBIDDEN:
+                    throw new ForbiddenException(codeNumber, requestId, statusMessage);
+                case NOT_FOUND:
+                case TOPIC_NOT_FOUND:
+                    throw new NotFoundException(codeNumber, requestId, statusMessage);
+                case TOO_MANY_REQUESTS:
+                    throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
+                case INTERNAL_ERROR:
+                case INTERNAL_SERVER_ERROR:
+                    throw new InternalErrorException(codeNumber, requestId, statusMessage);
+                case PROXY_TIMEOUT:
+                    throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
+                default:
+                    throw new UnsupportedException(codeNumber, requestId, statusMessage);
             }
-            SettableFuture<Assignments> future0 = SettableFuture.create();
-            final List<Assignment> assignmentList = resp.getAssignmentsList().stream().map(assignment ->
+            final List<Assignment> assignmentList = response.getAssignmentsList().stream().map(assignment ->
                 new Assignment(new MessageQueueImpl(assignment.getMessageQueue()))).collect(Collectors.toList());
             final Assignments assignments = new Assignments(assignmentList);
-            future0.set(assignments);
-            return future0;
+            return Futures.immediateFuture(assignments);
         }, MoreExecutors.directExecutor());
     }
 
@@ -314,7 +340,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
      * @param filterExpression filter expression of topic.
      * @return optional process queue.
      */
-    private Optional<ProcessQueue> createProcessQueue(MessageQueueImpl mq, final FilterExpression filterExpression) {
+    protected Optional<ProcessQueue> createProcessQueue(MessageQueueImpl mq, final FilterExpression filterExpression) {
         final ProcessQueueImpl processQueue = new ProcessQueueImpl(this, mq, filterExpression);
         final ProcessQueue previous = processQueueTable.putIfAbsent(mq, processQueue);
         if (null != previous) {
@@ -328,7 +354,9 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         return HeartbeatRequest.newBuilder().setGroup(getProtobufGroup()).build();
     }
 
-    private void syncProcessQueue(String topic, Assignments assignments, FilterExpression filterExpression) {
+
+    @VisibleForTesting
+    void syncProcessQueue(String topic, Assignments assignments, FilterExpression filterExpression) {
         Set<MessageQueueImpl> latest = new HashSet<>();
 
         final List<Assignment> assignmentList = assignments.getAssignmentList();
@@ -372,7 +400,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         }
     }
 
-    public void scanAssignments() {
+    @VisibleForTesting
+    void scanAssignments() {
         try {
             LOGGER.debug("Start to scan assignments periodically, clientId={}", clientId);
             for (Map.Entry<String, FilterExpression> entry : subscriptionExpressions.entrySet()) {
@@ -511,7 +540,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
             .setMaxDeliveryAttempts(getRetryPolicy().getMaxAttempts()).build();
     }
 
-    public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
+    public ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         final MessageViewImpl messageView) {
         // Intercept before forwarding message to DLQ.
         final Stopwatch stopwatch = Stopwatch.createStarted();
@@ -519,7 +548,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         doBefore(MessageHookPoints.FORWARD_TO_DLQ, messageCommons);
 
         final Endpoints endpoints = messageView.getEndpoints();
-        ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future;
+        ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> future;
         try {
             final ForwardMessageToDeadLetterQueueRequest request =
                 wrapForwardMessageToDeadLetterQueueRequest(messageView);
@@ -529,12 +558,12 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
         } catch (Throwable t) {
             future = Futures.immediateFailedFuture(t);
         }
-        Futures.addCallback(future, new FutureCallback<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>() {
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>() {
             @Override
-            public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
-                final ForwardMessageToDeadLetterQueueResponse resp = context.getResp();
+            public void onSuccess(RpcInvocation<ForwardMessageToDeadLetterQueueResponse> invocation) {
+                final ForwardMessageToDeadLetterQueueResponse response = invocation.getResponse();
                 final Duration duration = stopwatch.elapsed();
-                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(resp.getStatus().getCode()) ?
+                MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(response.getStatus().getCode()) ?
                     MessageHookPointsStatus.OK : MessageHookPointsStatus.ERROR;
                 // Intercept after forwarding message to DLQ.
                 doAfter(MessageHookPoints.FORWARD_TO_DLQ, messageCommons, duration, messageHookPointsStatus);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index f7a9998..6eb780e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -56,7 +56,7 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -142,6 +142,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
      */
     @Override
     public SimpleConsumer unsubscribe(String topic) {
+        // Check consumer status.
         if (!this.isRunning()) {
             LOGGER.error("Unable to remove subscription because simple consumer is not running, state={}, "
                 + "clientId={}", this.state(), clientId);
@@ -178,21 +179,22 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     }
 
     public ListenableFuture<List<MessageView>> receive0(int maxMessageNum, Duration invisibleDuration) {
-        SettableFuture<List<MessageView>> future = SettableFuture.create();
         if (!this.isRunning()) {
             LOGGER.error("Unable to receive message because simple consumer is not running, state={}, clientId={}",
                 this.state(), clientId);
-            future.setException(new IllegalStateException("Simple consumer is not running now"));
-            return future;
+            final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
+            return Futures.immediateFailedFuture(e);
+        }
+        if (maxMessageNum <= 0) {
+            final IllegalArgumentException e = new IllegalArgumentException("maxMessageNum must be greater than 0");
+            return Futures.immediateFailedFuture(e);
         }
         final HashMap<String, FilterExpression> copy = new HashMap<>(subscriptionExpressions);
         final ArrayList<String> topics = new ArrayList<>(copy.keySet());
         // All topic is subscribed.
         if (topics.isEmpty()) {
-            final IllegalArgumentException exception = new IllegalArgumentException("There is no topic to receive "
-                + "message");
-            future.setException(exception);
-            return future;
+            final IllegalArgumentException e = new IllegalArgumentException("There is no topic to receive message");
+            return Futures.immediateFailedFuture(e);
         }
         final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
         final FilterExpression filterExpression = copy.get(topic);
@@ -226,27 +228,27 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     }
 
     private ListenableFuture<Void> ack0(MessageView messageView) {
-        SettableFuture<Void> future0 = SettableFuture.create();
         // Check consumer status.
         if (!this.isRunning()) {
             LOGGER.error("Unable to ack message because simple consumer is not running, state={}, clientId={}",
                 this.state(), clientId);
-            future0.setException(new IllegalStateException("Simple consumer is not running now"));
-            return future0;
+            final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
+            return Futures.immediateFailedFuture(e);
         }
         if (!(messageView instanceof MessageViewImpl)) {
             final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
                 + "messageView");
-            future0.setException(exception);
-            return future0;
+            return Futures.immediateFailedFuture(exception);
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<InvocationContext<AckMessageResponse>> future = ackMessage(impl);
-        return Futures.transformAsync(future, ctx -> {
-            final String requestId = ctx.getRpcContext().getRequestId();
-            final AckMessageResponse resp = ctx.getResp();
-            final Status status = resp.getStatus();
+        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = ackMessage(impl);
+        return Futures.transformAsync(future, invocation -> {
+            final String requestId = invocation.getContext().getRequestId();
+            final AckMessageResponse response = invocation.getResponse();
+            final Status status = response.getStatus();
             final Code code = status.getCode();
+            final int codeNumber = code.getNumber();
+            final String statusMessage = status.getMessage();
             switch (code) {
                 case OK:
                     return Futures.immediateVoidFuture();
@@ -255,23 +257,23 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
                 case ILLEGAL_CONSUMER_GROUP:
                 case INVALID_RECEIPT_HANDLE:
                 case CLIENT_ID_REQUIRED:
-                    throw new BadRequestException(code.getNumber(), requestId, status.getMessage());
+                    throw new BadRequestException(codeNumber, requestId, statusMessage);
                 case UNAUTHORIZED:
-                    throw new UnauthorizedException(code.getNumber(), requestId, status.getMessage());
+                    throw new UnauthorizedException(codeNumber, requestId, statusMessage);
                 case FORBIDDEN:
-                    throw new ForbiddenException(code.getNumber(), requestId, status.getMessage());
+                    throw new ForbiddenException(codeNumber, requestId, statusMessage);
                 case NOT_FOUND:
                 case TOPIC_NOT_FOUND:
-                    throw new NotFoundException(code.getNumber(), requestId, status.getMessage());
+                    throw new NotFoundException(codeNumber, requestId, statusMessage);
                 case TOO_MANY_REQUESTS:
-                    throw new TooManyRequestsException(code.getNumber(), requestId, status.getMessage());
+                    throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
                 case INTERNAL_ERROR:
                 case INTERNAL_SERVER_ERROR:
-                    throw new InternalErrorException(code.getNumber(), requestId, status.getMessage());
+                    throw new InternalErrorException(codeNumber, requestId, statusMessage);
                 case PROXY_TIMEOUT:
-                    throw new ProxyTimeoutException(code.getNumber(), requestId, status.getMessage());
+                    throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
                 default:
-                    throw new UnsupportedException(code.getNumber(), requestId, status.getMessage());
+                    throw new UnsupportedException(codeNumber, requestId, statusMessage);
             }
         }, clientCallbackExecutor);
     }
@@ -295,22 +297,27 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     }
 
     public ListenableFuture<Void> changeInvisibleDuration0(MessageView messageView, Duration invisibleDuration) {
-        SettableFuture<Void> future0 = SettableFuture.create();
+        // Check consumer status.
+        if (!this.isRunning()) {
+            LOGGER.error("Unable to change invisible duration because simple consumer is not running, state={}, "
+                + "clientId={}", this.state(), clientId);
+            final IllegalStateException e = new IllegalStateException("Simple consumer is not running now");
+            return Futures.immediateFailedFuture(e);
+        }
         if (!(messageView instanceof MessageViewImpl)) {
             final IllegalArgumentException exception = new IllegalArgumentException("Failed downcasting for "
                 + "messageView");
-            future0.setException(exception);
-            return future0;
+            return Futures.immediateFailedFuture(exception);
         }
         MessageViewImpl impl = (MessageViewImpl) messageView;
-        final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
+        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> future =
             changeInvisibleDuration(impl, invisibleDuration);
-        return Futures.transformAsync(future, ctx -> {
-            final ChangeInvisibleDurationResponse resp = ctx.getResp();
-            final String requestId = ctx.getRpcContext().getRequestId();
+        return Futures.transformAsync(future, invocation -> {
+            final ChangeInvisibleDurationResponse response = invocation.getResponse();
+            final String requestId = invocation.getContext().getRequestId();
             // Refresh receipt handle manually.
-            impl.setReceiptHandle(resp.getReceiptHandle());
-            final Status status = resp.getStatus();
+            impl.setReceiptHandle(response.getReceiptHandle());
+            final Status status = response.getStatus();
             final Code code = status.getCode();
             final int codeNumber = code.getNumber();
             final String statusMessage = status.getMessage();
@@ -328,6 +335,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
                     throw new UnauthorizedException(codeNumber, requestId, statusMessage);
                 case NOT_FOUND:
                 case TOPIC_NOT_FOUND:
+                    throw new NotFoundException(codeNumber, requestId, statusMessage);
                 case TOO_MANY_REQUESTS:
                     throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
                 case INTERNAL_ERROR:
@@ -351,10 +359,6 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         return simpleConsumerSettings;
     }
 
-    protected SimpleConsumerSettings getSimpleConsumerSettings() {
-        return simpleConsumerSettings;
-    }
-
     public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
         final SubscriptionLoadBalancer subscriptionLoadBalancer =
             new SubscriptionLoadBalancer(topicRouteDataResult);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 0764ed4..05be6b7 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -68,13 +68,12 @@ import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.message.MessageType;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.PublishingMessageImpl;
-import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -101,9 +100,8 @@ class ProducerImpl extends ClientImpl implements Producer {
         super(clientConfiguration, topics);
         ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
         this.producerSettings = new ProducerSettings(clientId, endpoints, retryPolicy,
-            clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
+            clientConfiguration.getRequestTimeout(), topics);
         this.checker = checker;
-
         this.publishingRouteDataResultCache = new ConcurrentHashMap<>();
     }
 
@@ -283,14 +281,14 @@ class ProducerImpl extends ClientImpl implements Producer {
             MessageHookPoints.COMMIT_TRANSACTION : MessageHookPoints.ROLLBACK_TRANSACTION;
         doBefore(messageHookPoints, messageCommons);
 
-        final ListenableFuture<InvocationContext<EndTransactionResponse>> future =
+        final ListenableFuture<RpcInvocation<EndTransactionResponse>> future =
             clientManager.endTransaction(endpoints, metadata, request, requestTimeout);
-        Futures.addCallback(future, new FutureCallback<InvocationContext<EndTransactionResponse>>() {
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<EndTransactionResponse>>() {
             @Override
-            public void onSuccess(InvocationContext<EndTransactionResponse> context) {
+            public void onSuccess(RpcInvocation<EndTransactionResponse> invocation) {
                 final Duration duration = stopwatch.elapsed();
-                final EndTransactionResponse resp = context.getResp();
-                final Status status = resp.getStatus();
+                final EndTransactionResponse response = invocation.getResponse();
+                final Status status = response.getStatus();
                 final Code code = status.getCode();
                 MessageHookPointsStatus messageHookPointsStatus = Code.OK.equals(code) ? MessageHookPointsStatus.OK :
                     MessageHookPointsStatus.ERROR;
@@ -303,9 +301,9 @@ class ProducerImpl extends ClientImpl implements Producer {
                 doAfter(messageHookPoints, messageCommons, duration, MessageHookPointsStatus.ERROR);
             }
         }, MoreExecutors.directExecutor());
-        final InvocationContext<EndTransactionResponse> context = handleClientFuture(future);
-        final EndTransactionResponse resp = context.getResp();
-        final Status status = resp.getStatus();
+        final RpcInvocation<EndTransactionResponse> invocation = handleClientFuture(future);
+        final EndTransactionResponse response = invocation.getResponse();
+        final Status status = response.getStatus();
         final Code code = status.getCode();
         if (!Code.OK.equals(code)) {
             throw new ClientException(code.getNumber(), status.getMessage());
@@ -447,7 +445,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         final Endpoints endpoints = messageQueue.getBroker().getEndpoints();
         final SendMessageRequest request = wrapSendMessageRequest(messages);
 
-        final ListenableFuture<InvocationContext<SendMessageResponse>> responseFuture =
+        final ListenableFuture<RpcInvocation<SendMessageResponse>> responseFuture =
             clientManager.sendMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
 
         final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
index 81e6e8b..f1eef34 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerSettings.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.client.java.impl.producer;
 
 import apache.rocketmq.v2.Publishing;
+import apache.rocketmq.v2.Resource;
 import apache.rocketmq.v2.Settings;
 import com.google.common.base.MoreObjects;
 import com.google.common.util.concurrent.Futures;
@@ -28,7 +29,6 @@ import java.util.stream.Collectors;
 import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.impl.ClientType;
 import org.apache.rocketmq.client.java.impl.UserAgent;
-import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
@@ -37,16 +37,16 @@ import org.slf4j.LoggerFactory;
 public class ProducerSettings extends ClientSettings {
     private static final Logger LOGGER = LoggerFactory.getLogger(ProducerSettings.class);
 
-    private final Set<Resource> topics;
+    private final Set<String> topics;
     /**
      * If message body size exceeds the threshold, it would be compressed for convenience of transport.
      */
     private volatile int maxBodySizeBytes = 4 * 1024 * 1024;
     private volatile boolean validateMessageType = true;
 
-    public ProducerSettings(String clientId, Endpoints accessPoint,
-        ExponentialBackoffRetryPolicy exponentialBackoffRetryPolicy, Duration requestTimeout, Set<Resource> topics) {
-        super(clientId, ClientType.PRODUCER, accessPoint, exponentialBackoffRetryPolicy, requestTimeout);
+    public ProducerSettings(String clientId, Endpoints accessPoint, ExponentialBackoffRetryPolicy retryPolicy,
+        Duration requestTimeout, Set<String> topics) {
+        super(clientId, ClientType.PRODUCER, accessPoint, retryPolicy, requestTimeout);
         this.topics = topics;
     }
 
@@ -60,8 +60,9 @@ public class ProducerSettings extends ClientSettings {
 
     @Override
     public Settings toProtobuf() {
-        final Publishing publishing = Publishing.newBuilder().addAllTopics(topics.stream().map(Resource::toProtobuf)
-            .collect(Collectors.toList())).build();
+        final Publishing publishing = Publishing.newBuilder()
+            .addAllTopics(topics.stream().map(name -> Resource.newBuilder().setName(name).build())
+                .collect(Collectors.toList())).build();
         final Settings.Builder builder = Settings.newBuilder()
             .setAccessPoint(accessPoint.toProtobuf()).setClientType(clientType.toProtobuf())
             .setRequestTimeout(Durations.fromNanos(requestTimeout.toNanos())).setPublishing(publishing);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index f6f27f6..519475a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -40,7 +40,7 @@ import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 
 public class SendReceiptImpl implements SendReceipt {
     private final MessageId messageId;
@@ -78,12 +78,12 @@ public class SendReceiptImpl implements SendReceipt {
     }
 
     public static List<SendReceiptImpl> processRespContext(MessageQueueImpl mq,
-        InvocationContext<SendMessageResponse> ctx) throws ClientException {
-        final String requestId = ctx.getRpcContext().getRequestId();
-        final SendMessageResponse resp = ctx.getResp();
-        final Status status = resp.getStatus();
+        RpcInvocation<SendMessageResponse> invocation) throws ClientException {
+        final String requestId = invocation.getContext().getRequestId();
+        final SendMessageResponse response = invocation.getResponse();
+        final Status status = response.getStatus();
         List<SendReceiptImpl> sendReceipts = new ArrayList<>();
-        final List<SendResultEntry> entries = resp.getEntriesList();
+        final List<SendResultEntry> entries = response.getEntriesList();
         for (SendResultEntry entry : entries) {
             final Status entryStatus = entry.getStatus();
             final Code code = entryStatus.getCode();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index 8c9b75f..94adee4 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -32,7 +32,7 @@ import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
 import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.exception.UnsupportedException;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 
 /**
  * Result topic route data fetched from remote.
@@ -42,12 +42,12 @@ public class TopicRouteDataResult {
     private final TopicRouteData topicRouteData;
     private final ClientException exception;
 
-    public TopicRouteDataResult(InvocationContext<QueryRouteResponse> ctx) {
-        final QueryRouteResponse resp = ctx.getResp();
-        final String requestId = ctx.getRpcContext().getRequestId();
-        final List<MessageQueue> messageQueuesList = resp.getMessageQueuesList();
+    public TopicRouteDataResult(RpcInvocation<QueryRouteResponse> invocation) {
+        final QueryRouteResponse response = invocation.getResponse();
+        final String requestId = invocation.getContext().getRequestId();
+        final List<MessageQueue> messageQueuesList = response.getMessageQueuesList();
         final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList);
-        final Status status = resp.getStatus();
+        final Status status = response.getStatus();
         this.topicRouteData = topicRouteData;
         final Code code = status.getCode();
         final int codeNumber = code.getNumber();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Context.java
similarity index 93%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Context.java
index 5def58b..721bad6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcContext.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/Context.java
@@ -20,11 +20,11 @@ package org.apache.rocketmq.client.java.rpc;
 import io.grpc.Metadata;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
-public class RpcContext {
+public class Context {
     private final Endpoints endpoints;
     private final Metadata header;
 
-    public RpcContext(Endpoints endpoints, Metadata header) {
+    public Context(Endpoints endpoints, Metadata header) {
         this.endpoints = endpoints;
         this.header = header;
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
index 70744f3..7979b4e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClient.java
@@ -74,7 +74,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Metadata metadata, QueryRouteRequest request,
+    ListenableFuture<RpcInvocation<QueryRouteResponse>> queryRoute(Metadata metadata, QueryRouteRequest request,
         Executor executor,
         Duration duration);
 
@@ -87,7 +87,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request,
+    ListenableFuture<RpcInvocation<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request,
         Executor executor,
         Duration duration);
 
@@ -100,7 +100,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Metadata metadata,
+    ListenableFuture<RpcInvocation<SendMessageResponse>> sendMessage(Metadata metadata,
         SendMessageRequest request, Executor executor, Duration duration);
 
     /**
@@ -112,7 +112,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Metadata metadata,
+    ListenableFuture<RpcInvocation<QueryAssignmentResponse>> queryAssignment(Metadata metadata,
         QueryAssignmentRequest request, Executor executor, Duration duration);
 
     /**
@@ -123,7 +123,7 @@ public interface RpcClient {
      * @param executor gRPC asynchronous executor.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata,
+    ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration duration);
 
     /**
@@ -135,7 +135,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Metadata metadata, AckMessageRequest request,
+    ListenableFuture<RpcInvocation<AckMessageResponse>> ackMessage(Metadata metadata, AckMessageRequest request,
         Executor executor, Duration duration);
 
     /**
@@ -147,7 +147,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Metadata metadata,
+    ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> changeInvisibleDuration(Metadata metadata,
         ChangeInvisibleDurationRequest request, Executor executor, Duration duration);
 
     /**
@@ -159,7 +159,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
+    ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Executor executor, Duration duration);
 
     /**
@@ -171,7 +171,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Metadata metadata,
+    ListenableFuture<RpcInvocation<EndTransactionResponse>> endTransaction(Metadata metadata,
         EndTransactionRequest request, Executor executor, Duration duration);
 
     /**
@@ -183,7 +183,7 @@ public interface RpcClient {
      * @param duration request max duration.
      * @return invocation of response future.
      */
-    ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(Metadata metadata,
+    ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> notifyClientTermination(Metadata metadata,
         NotifyClientTerminationRequest request, Executor executor, Duration duration);
 
     StreamObserver<TelemetryCommand> telemetry(Metadata metadata, Executor executor, Duration duration,
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
index ffba65e..0578a2d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcClientImpl.java
@@ -91,7 +91,6 @@ public class RpcClientImpl implements RpcClient {
                 .intercept(LoggingInterceptor.getInstance())
                 .sslContext(sslContext);
         // Disable grpc's auto-retry here.
-        channelBuilder.disableRetry();
 
         final List<InetSocketAddress> socketAddresses = endpoints.toSocketAddresses();
         if (null != socketAddresses) {
@@ -106,11 +105,11 @@ public class RpcClientImpl implements RpcClient {
         this.activityNanoTime = System.nanoTime();
     }
 
-    private <T> ListenableFuture<InvocationContext<T>> wrapInvocationContext(ListenableFuture<T> future,
+    private <T> ListenableFuture<RpcInvocation<T>> wrapInvocationContext(ListenableFuture<T> future,
         Metadata header) {
         return Futures.transformAsync(future, response -> {
-            final RpcContext rpcContext = new RpcContext(endpoints, header);
-            return Futures.immediateFuture(new InvocationContext<>(response, rpcContext));
+            final Context context = new Context(endpoints, header);
+            return Futures.immediateFuture(new RpcInvocation<>(response, context));
         }, MoreExecutors.directExecutor());
     }
 
@@ -125,7 +124,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<QueryRouteResponse>> queryRoute(Metadata metadata,
+    public ListenableFuture<RpcInvocation<QueryRouteResponse>> queryRoute(Metadata metadata,
         QueryRouteRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<QueryRouteResponse> future = futureStub
@@ -135,7 +134,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request,
+    public ListenableFuture<RpcInvocation<HeartbeatResponse>> heartbeat(Metadata metadata, HeartbeatRequest request,
         Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<HeartbeatResponse> future =
@@ -145,7 +144,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<SendMessageResponse>> sendMessage(Metadata metadata,
+    public ListenableFuture<RpcInvocation<SendMessageResponse>> sendMessage(Metadata metadata,
         SendMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<SendMessageResponse> future =
@@ -155,7 +154,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<QueryAssignmentResponse>> queryAssignment(Metadata metadata,
+    public ListenableFuture<RpcInvocation<QueryAssignmentResponse>> queryAssignment(Metadata metadata,
         QueryAssignmentRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<QueryAssignmentResponse> future =
@@ -165,7 +164,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata,
+    public ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> receiveMessage(Metadata metadata,
         ReceiveMessageRequest request, ExecutorService executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final Callable<Iterator<ReceiveMessageResponse>> callable = () -> blockingStub
@@ -177,7 +176,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<AckMessageResponse>> ackMessage(Metadata metadata,
+    public ListenableFuture<RpcInvocation<AckMessageResponse>> ackMessage(Metadata metadata,
         AckMessageRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<AckMessageResponse> future =
@@ -187,7 +186,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
+    public ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> changeInvisibleDuration(
         Metadata metadata, ChangeInvisibleDurationRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<ChangeInvisibleDurationResponse> future =
@@ -197,7 +196,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
+    public ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> forwardMessageToDeadLetterQueue(
         Metadata metadata, ForwardMessageToDeadLetterQueueRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<ForwardMessageToDeadLetterQueueResponse> future = futureStub
@@ -207,7 +206,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<EndTransactionResponse>> endTransaction(Metadata metadata,
+    public ListenableFuture<RpcInvocation<EndTransactionResponse>> endTransaction(Metadata metadata,
         EndTransactionRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<EndTransactionResponse> future =
@@ -217,7 +216,7 @@ public class RpcClientImpl implements RpcClient {
     }
 
     @Override
-    public ListenableFuture<InvocationContext<NotifyClientTerminationResponse>> notifyClientTermination(
+    public ListenableFuture<RpcInvocation<NotifyClientTerminationResponse>> notifyClientTermination(
         Metadata metadata, NotifyClientTerminationRequest request, Executor executor, Duration duration) {
         this.activityNanoTime = System.nanoTime();
         final ListenableFuture<NotifyClientTerminationResponse> future =
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcInvocation.java
similarity index 78%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcInvocation.java
index eb5f487..4f2d6a1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/InvocationContext.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/rpc/RpcInvocation.java
@@ -19,28 +19,28 @@ package org.apache.rocketmq.client.java.rpc;
 
 import com.google.common.base.MoreObjects;
 
-public class InvocationContext<T> {
+public class RpcInvocation<T> {
     private final T t;
-    private final RpcContext rpcContext;
+    private final Context context;
 
-    public InvocationContext(T t, RpcContext rpcContext) {
+    public RpcInvocation(T t, Context context) {
         this.t = t;
-        this.rpcContext = rpcContext;
+        this.context = context;
     }
 
-    public T getResp() {
+    public T getResponse() {
         return t;
     }
 
-    public RpcContext getRpcContext() {
-        return rpcContext;
+    public Context getContext() {
+        return context;
     }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("resp", t)
-            .add("rpcContext", rpcContext)
+            .add("context", context)
             .toString();
     }
 }
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
index f0b7f3e..270f09b 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/ClientManagerImplTest.java
@@ -33,7 +33,7 @@ import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 
 public class ClientManagerImplTest extends TestBase {
-    private final ClientManagerImpl clientManager = new ClientManagerImpl();
+    private final ClientManagerImpl clientManager = new ClientManagerImpl(null);
 
     @Test
     public void testQueryRoute() {
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 5ef44bc..3cd91b9 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -47,7 +47,7 @@ import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.RequestIdGenerator;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Before;
 import org.junit.Test;
@@ -181,7 +181,7 @@ public class ProcessQueueImplTest extends TestBase {
         assertEquals(cachedMessageCount, processQueue.cachedMessagesCount());
         assertEquals(1, processQueue.inflightMessagesCount());
 
-        final ListenableFuture<InvocationContext<AckMessageResponse>> future = okAckMessageResponseFuture();
+        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future);
         processQueue.eraseMessage(optionalMessageView.get(), ConsumeResult.SUCCESS);
         future.addListener(() -> verify(pushConsumer, times(1))
@@ -224,7 +224,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<InvocationContext<AckMessageResponse>> future0 = okAckMessageResponseFuture();
+        ListenableFuture<RpcInvocation<AckMessageResponse>> future0 = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(1);
@@ -240,7 +240,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future0 =
+        ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>> future0 =
             okForwardMessageToDeadLetterQueueResponseFuture();
         when(pushConsumer.forwardMessageToDeadLetterQueue(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
@@ -257,7 +257,7 @@ public class ProcessQueueImplTest extends TestBase {
         final MessageViewImpl messageView = fakeMessageViewImpl(2, false);
         messageViewList.add(messageView);
         processQueue.cacheMessages(messageViewList);
-        ListenableFuture<InvocationContext<AckMessageResponse>> future0 = okAckMessageResponseFuture();
+        ListenableFuture<RpcInvocation<AckMessageResponse>> future0 = okAckMessageResponseFuture();
         when(pushConsumer.ackMessage(any(MessageViewImpl.class))).thenReturn(future0);
         when(pushConsumer.getRetryPolicy()).thenReturn(retryPolicy);
         when(retryPolicy.getMaxAttempts()).thenReturn(2);
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index 8b88702..ba65c05 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -17,58 +17,32 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.atLeast;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
-import apache.rocketmq.v2.CustomizedBackoff;
-import apache.rocketmq.v2.QueryAssignmentRequest;
-import apache.rocketmq.v2.QueryRouteRequest;
-import apache.rocketmq.v2.RetryPolicy;
-import apache.rocketmq.v2.Settings;
-import apache.rocketmq.v2.Subscription;
-import apache.rocketmq.v2.TelemetryCommand;
-import com.google.common.util.concurrent.Service;
-import com.google.protobuf.util.Durations;
-import io.grpc.Metadata;
-import io.grpc.stub.StreamObserver;
-import java.time.Duration;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
-import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
-import org.apache.rocketmq.client.java.route.Endpoints;
+import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.tool.TestBase;
-import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.InjectMocks;
-import org.mockito.Mock;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class PushConsumerImplTest extends TestBase {
-    @Mock
-    private ClientManagerImpl clientManager;
-
-    @Mock
-    private StreamObserver<TelemetryCommand> telemetryRequestObserver;
-
-    @SuppressWarnings("unused")
-    @InjectMocks
-    private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
-
     private final Map<String, FilterExpression> subscriptionExpressions = createSubscriptionExpressions(FAKE_TOPIC_0);
 
     private final MessageListener messageListener = messageView -> ConsumeResult.SUCCESS;
@@ -80,81 +54,56 @@ public class PushConsumerImplTest extends TestBase {
     private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
         .setEndpoints(FAKE_ACCESS_POINT).build();
 
-    private PushConsumerImpl pushConsumer;
+    @Spy
+    private final PushConsumerImpl pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0,
+        subscriptionExpressions, messageListener, maxCacheMessageCount, maxCacheMessageSizeInBytes,
+        consumptionThreadCount);
 
-    private void start(PushConsumerImpl pushConsumer) throws ClientException {
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
-            any(Duration.class)))
-            .thenReturn(okQueryRouteResponseFuture());
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
-            any(ClientSessionImpl.class)))
-            .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
-            "TestScheduler"));
-        when(clientManager.getScheduler()).thenReturn(scheduler);
-        doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
 
-        CustomizedBackoff customizedBackoff = CustomizedBackoff.newBuilder()
-            .addNext(Durations.fromNanos(Duration.ofSeconds(3).toNanos())).build();
-        RetryPolicy retryPolicy = RetryPolicy.newBuilder().setMaxAttempts(17).setCustomizedBackoff(customizedBackoff)
-            .build();
-        Subscription subscription = Subscription.newBuilder().build();
-        Settings settings = Settings.newBuilder().setSubscription(subscription).setBackoffPolicy(retryPolicy).build();
-        final Service service = pushConsumer.startAsync();
-        pushConsumer.getPushConsumerSettings().applySettingsCommand(settings);
-        service.awaitRunning();
+    @Test(expected = IllegalStateException.class)
+    public void testSubscribeBeforeStartup() throws ClientException {
+        pushConsumer.subscribe(FAKE_TOPIC_0, new FilterExpression(FAKE_TOPIC_0));
     }
 
-    private void shutdown(PushConsumerImpl pushConsumer) {
-        final Service clientManagerService = mock(Service.class);
-        when(clientManager.stopAsync()).thenReturn(clientManagerService);
-        doNothing().when(clientManagerService).awaitTerminated();
-        pushConsumer.stopAsync().awaitTerminated();
+    @Test(expected = IllegalStateException.class)
+    public void testUnsubscribeBeforeStartup() {
+        pushConsumer.unsubscribe(FAKE_TOPIC_0);
     }
 
     @Test
-    public void testScanAssignment() throws ExecutionException, InterruptedException, ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
-            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
-        start(pushConsumer);
-        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
-            any(Duration.class))).thenReturn(okQueryAssignmentResponseFuture());
-        pushConsumer.scanAssignments();
-        verify(clientManager, atLeast(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
-            any(QueryRouteRequest.class), any(Duration.class));
-        verify(clientManager, atLeast(1)).queryAssignment(any(Endpoints.class),
-            any(Metadata.class), any(QueryAssignmentRequest.class), any(Duration.class));
-        Assert.assertEquals(okQueryAssignmentResponseFuture().get().getResp().getAssignmentsCount(),
-            pushConsumer.getQueueSize());
-        when(clientManager.queryAssignment(any(Endpoints.class), any(Metadata.class), any(QueryAssignmentRequest.class),
-            any(Duration.class))).thenReturn(okEmptyQueryAssignmentResponseFuture());
-        pushConsumer.scanAssignments();
-        Assert.assertEquals(0, pushConsumer.getQueueSize());
-        shutdown(pushConsumer);
+    public void testQueryAssignment() {
+        final PushConsumerImpl mockedPushConsumer = Mockito.mock(PushConsumerImpl.class);
+        mockedPushConsumer.queryAssignment(FAKE_TOPIC_0);
     }
 
-    @Test(expected = IllegalStateException.class)
-    public void testSubscribeWithoutStart() throws ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
-            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
-        pushConsumer.subscribe(FAKE_TOPIC_0, new FilterExpression(FAKE_TOPIC_0));
-    }
-
-    @Test(expected = IllegalStateException.class)
-    public void testUnsubscribeWithoutStart() {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions, messageListener,
-            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
-        pushConsumer.unsubscribe(FAKE_TOPIC_0);
+    @Test
+    public void testScanAssignments() {
+        final MessageQueueImpl messageQueue = fakeMessageQueueImpl0();
+        final Assignment assignment = new Assignment(messageQueue);
+        final Assignments assignments = new Assignments(Collections.singletonList(assignment));
+        final ListenableFuture<Assignments> assignmentsFuture = Futures.immediateFuture(assignments);
+        Mockito.when(pushConsumer.queryAssignment(FAKE_TOPIC_0)).thenReturn(assignmentsFuture);
+        pushConsumer.scanAssignments();
+        final ArgumentCaptor<String> topicCaptor0 = ArgumentCaptor.forClass(String.class);
+        verify(pushConsumer, times(1))
+            .syncProcessQueue(topicCaptor0.capture(), any(Assignments.class), any(FilterExpression.class));
+        final String topic0 = topicCaptor0.getValue();
+        assertEquals(topic0, FAKE_TOPIC_0);
+        pushConsumer.scanAssignments();
+        final ArgumentCaptor<String> topicCaptor1 = ArgumentCaptor.forClass(String.class);
+        verify(pushConsumer, times(2))
+            .syncProcessQueue(topicCaptor1.capture(), any(Assignments.class), any(FilterExpression.class));
+        final String topic1 = topicCaptor1.getValue();
+        assertEquals(topic1, FAKE_TOPIC_0);
     }
 
     @Test
-    public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
-        pushConsumer = new PushConsumerImpl(clientConfiguration, FAKE_GROUP_0, subscriptionExpressions,
-            messageListener,
-            maxCacheMessageCount, maxCacheMessageSizeInBytes, consumptionThreadCount);
-        start(pushConsumer);
-        final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
-        pushConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
-        shutdown(pushConsumer);
+    public void testScanAssignmentsWithoutResults() {
+        final Assignments assignments = new Assignments(Collections.emptyList());
+        final ListenableFuture<Assignments> assignmentsFuture = Futures.immediateFuture(assignments);
+        Mockito.when(pushConsumer.queryAssignment(FAKE_TOPIC_0)).thenReturn(assignmentsFuture);
+        pushConsumer.scanAssignments();
+        verify(pushConsumer, never()).syncProcessQueue(any(String.class), any(Assignments.class),
+            any(FilterExpression.class));
     }
 }
\ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index e6ede4e..b67fd19 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -17,68 +17,43 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
-import static org.junit.Assert.assertEquals;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.when;
 
-import apache.rocketmq.v2.AckMessageRequest;
 import apache.rocketmq.v2.AckMessageResponse;
-import apache.rocketmq.v2.Broker;
-import apache.rocketmq.v2.ChangeInvisibleDurationRequest;
 import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.MessageQueue;
-import apache.rocketmq.v2.Permission;
-import apache.rocketmq.v2.QueryRouteRequest;
-import apache.rocketmq.v2.QueryRouteResponse;
-import apache.rocketmq.v2.ReceiveMessageRequest;
-import apache.rocketmq.v2.ReceiveMessageResponse;
-import apache.rocketmq.v2.Resource;
-import apache.rocketmq.v2.Settings;
-import apache.rocketmq.v2.Status;
-import apache.rocketmq.v2.Subscription;
-import apache.rocketmq.v2.TelemetryCommand;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Metadata;
-import io.grpc.stub.StreamObserver;
 import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.message.MessageView;
-import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnauthorizedException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
-import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
-import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.InjectMocks;
-import org.mockito.Mock;
+import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
 @RunWith(MockitoJUnitRunner.class)
 public class SimpleConsumerImplTest extends TestBase {
-    @Mock
-    private ClientManagerImpl clientManager;
-    @Mock
-    private StreamObserver<TelemetryCommand> telemetryRequestObserver;
     @InjectMocks
-    private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
     private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
         .setEndpoints(FAKE_ACCESS_POINT).build();
 
@@ -87,45 +62,6 @@ public class SimpleConsumerImplTest extends TestBase {
 
     private SimpleConsumerImpl simpleConsumer;
 
-    private void start(SimpleConsumerImpl simpleConsumer) throws ClientException {
-        SettableFuture<InvocationContext<QueryRouteResponse>> future0 = SettableFuture.create();
-        Status status = Status.newBuilder().setCode(Code.OK).build();
-        List<MessageQueue> messageQueueList = new ArrayList<>();
-        MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
-            .setPermission(Permission.READ_WRITE)
-            .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0())).setId(0)
-            .build();
-        messageQueueList.add(mq);
-        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
-            .addAllMessageQueues(messageQueueList).build();
-        final InvocationContext<QueryRouteResponse> invocationContext = new InvocationContext<>(response,
-            fakeRpcContext());
-        future0.set(invocationContext);
-        when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
-            any(Duration.class)))
-            .thenReturn(future0);
-        when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
-            any(ClientSessionImpl.class)))
-            .thenReturn(telemetryRequestObserver);
-        final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
-            new ThreadFactoryImpl("TestScheduler"));
-        when(clientManager.getScheduler()).thenReturn(scheduler);
-        doNothing().when(telemetryRequestObserver).onNext(any(TelemetryCommand.class));
-
-        Subscription subscription = Subscription.newBuilder().build();
-        Settings settings = Settings.newBuilder().setSubscription(subscription).build();
-        final Service service = simpleConsumer.startAsync();
-        simpleConsumer.getSimpleConsumerSettings().applySettingsCommand(settings);
-        service.awaitRunning();
-    }
-
-    private void shutdown(SimpleConsumerImpl simpleConsumer) {
-        final Service clientManagerService = mock(Service.class);
-        when(clientManager.stopAsync()).thenReturn(clientManagerService);
-        doNothing().when(clientManagerService).awaitTerminated();
-        simpleConsumer.stopAsync().awaitTerminated();
-    }
-
     @Test(expected = IllegalStateException.class)
     public void testReceiveWithoutStart() throws ClientException {
         simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
@@ -151,78 +87,375 @@ public class SimpleConsumerImplTest extends TestBase {
     }
 
     @Test
-    public void testStartAndShutdown() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        shutdown(simpleConsumer);
-    }
-
-    @Test
-    public void testSubscribeWithSubscriptionOverwriting() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        final FilterExpression filterExpression = new FilterExpression(FAKE_TAG_0);
-        simpleConsumer.subscribe(FAKE_TOPIC_0, filterExpression);
-        shutdown(simpleConsumer);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testReceiveWithAllTopicsAreUnsubscribed() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        simpleConsumer.unsubscribe(FAKE_TOPIC_0);
+    public void testReceiveAsyncWithZeroMaxMessageNum() throws InterruptedException {
+        simpleConsumer = Mockito.spy(new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+            subExpressions));
+        when(simpleConsumer.isRunning()).thenReturn(true);
+        final CompletableFuture<List<MessageView>> future = simpleConsumer.receiveAsync(0,
+            Duration.ofSeconds(3));
         try {
-            simpleConsumer.receive(1, Duration.ofSeconds(1));
-        } finally {
-            shutdown(simpleConsumer);
+            future.get();
+            fail();
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof IllegalArgumentException);
         }
     }
 
     @Test
-    public void testReceiveMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        int receivedMessageCount = 16;
-        final ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> future =
-            okReceiveMessageResponsesFuture(FAKE_TOPIC_0, receivedMessageCount);
-        when(clientManager.receiveMessage(any(Endpoints.class), any(Metadata.class), any(ReceiveMessageRequest.class),
-            any(Duration.class))).thenReturn(future);
-        final List<MessageView> messageViews = simpleConsumer.receive(1, Duration.ofSeconds(1));
-        verify(clientManager, times(1)).receiveMessage(any(Endpoints.class),
-            any(Metadata.class), any(ReceiveMessageRequest.class), any(Duration.class));
-        assertEquals(receivedMessageCount, messageViews.size());
-        shutdown(simpleConsumer);
-    }
-
-    @Test
-    public void testAckMessageSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        try {
-            final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<InvocationContext<AckMessageResponse>> future = okAckMessageResponseFuture();
-            when(clientManager.ackMessage(any(Endpoints.class), any(Metadata.class), any(AckMessageRequest.class),
-                any(Duration.class))).thenReturn(future);
-            simpleConsumer.ack(messageView);
-        } finally {
-            shutdown(simpleConsumer);
+    public void testAckAsync() throws ExecutionException, InterruptedException {
+        simpleConsumer = Mockito.spy(new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+            subExpressions));
+        when(simpleConsumer.isRunning()).thenReturn(true);
+        final MessageViewImpl messageView = fakeMessageViewImpl(false);
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(okAckMessageResponseFuture());
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            future.get();
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.BAD_REQUEST));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.ILLEGAL_TOPIC));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<AckMessageResponse>> respFuture =
+                ackMessageResponseFuture(Code.ILLEGAL_CONSUMER_GROUP);
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<AckMessageResponse>> respFuture =
+                ackMessageResponseFuture(Code.INVALID_RECEIPT_HANDLE);
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.CLIENT_ID_REQUIRED));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.UNAUTHORIZED));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof UnauthorizedException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.FORBIDDEN));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof ForbiddenException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.NOT_FOUND));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof NotFoundException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.TOPIC_NOT_FOUND));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof NotFoundException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.TOO_MANY_REQUESTS));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof TooManyRequestsException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.INTERNAL_ERROR));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof InternalErrorException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<AckMessageResponse>> respFuture =
+                ackMessageResponseFuture(Code.INTERNAL_SERVER_ERROR);
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof InternalErrorException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.PROXY_TIMEOUT));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof ProxyTimeoutException);
+            }
+        }
+        {
+            when(simpleConsumer.ackMessage(messageView)).thenReturn(ackMessageResponseFuture(Code.UNSUPPORTED));
+            final CompletableFuture<Void> future = simpleConsumer.ackAsync(messageView);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof UnsupportedException);
+            }
         }
     }
 
     @Test
-    public void testChangeInvisibleDurationSuccess() throws ClientException {
-        simpleConsumer = new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration, subExpressions);
-        start(simpleConsumer);
-        try {
-            final MessageViewImpl messageView = fakeMessageViewImpl();
-            final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
-                okChangeInvisibleDurationResponseFuture(FAKE_RECEIPT_HANDLE_1);
-            when(clientManager.changeInvisibleDuration(any(Endpoints.class), any(Metadata.class),
-                any(ChangeInvisibleDurationRequest.class), any(Duration.class))).thenReturn(future);
-            simpleConsumer.changeInvisibleDuration0(messageView, Duration.ofSeconds(3));
-            assertEquals(FAKE_RECEIPT_HANDLE_1, messageView.getReceiptHandle());
-        } finally {
-            shutdown(simpleConsumer);
+    public void testChangeInvisibleDurationAsync() throws ExecutionException, InterruptedException {
+        simpleConsumer = Mockito.spy(new SimpleConsumerImpl(clientConfiguration, FAKE_GROUP_0, awaitDuration,
+            subExpressions));
+        when(simpleConsumer.isRunning()).thenReturn(true);
+        final MessageViewImpl messageView = fakeMessageViewImpl(false);
+        final Duration duration = Duration.ofSeconds(3);
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                okChangeInvisibleDurationCtxFuture();
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            future.get();
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.BAD_REQUEST);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.ILLEGAL_TOPIC);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.ILLEGAL_CONSUMER_GROUP);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.ILLEGAL_INVISIBLE_TIME);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.INVALID_RECEIPT_HANDLE);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.CLIENT_ID_REQUIRED);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof BadRequestException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.UNAUTHORIZED);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof UnauthorizedException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.NOT_FOUND);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof NotFoundException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.TOPIC_NOT_FOUND);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof NotFoundException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.TOO_MANY_REQUESTS);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof TooManyRequestsException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.INTERNAL_ERROR);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof InternalErrorException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.INTERNAL_SERVER_ERROR);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof InternalErrorException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.PROXY_TIMEOUT);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof ProxyTimeoutException);
+            }
+        }
+        {
+            final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> respFuture =
+                changInvisibleDurationCtxFuture(Code.UNSUPPORTED);
+            when(simpleConsumer.changeInvisibleDuration(messageView, duration)).thenReturn(respFuture);
+            final CompletableFuture<Void> future = simpleConsumer.changeInvisibleDurationAsync(messageView,
+                duration);
+            try {
+                future.get();
+                fail();
+            } catch (ExecutionException e) {
+                assertTrue(e.getCause() instanceof UnsupportedException);
+            }
         }
     }
 }
\ No newline at end of file
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index f069b52..45e66a5 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -57,11 +57,10 @@ import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
-import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
 import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.apache.rocketmq.client.java.tool.TestBase;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -75,9 +74,6 @@ public class ProducerImplTest extends TestBase {
     private ClientManagerImpl clientManager;
     @Mock
     private StreamObserver<TelemetryCommand> telemetryRequestObserver;
-    @SuppressWarnings("unused")
-    @InjectMocks
-    private ClientManagerRegistry clientManagerRegistry = ClientManagerRegistry.getInstance();
 
     private final ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
         .setEndpoints(FAKE_ACCESS_POINT).build();
@@ -95,7 +91,7 @@ public class ProducerImplTest extends TestBase {
         null);
 
     private void start(ProducerImpl producer) throws ClientException {
-        SettableFuture<InvocationContext<QueryRouteResponse>> future0 = SettableFuture.create();
+        SettableFuture<RpcInvocation<QueryRouteResponse>> future0 = SettableFuture.create();
         Status status = Status.newBuilder().setCode(Code.OK).build();
         List<MessageQueue> messageQueueList = new ArrayList<>();
         MessageQueue mq = MessageQueue.newBuilder().setTopic(Resource.newBuilder().setName(FAKE_TOPIC_0))
@@ -103,11 +99,11 @@ public class ProducerImplTest extends TestBase {
             .setBroker(Broker.newBuilder().setName(FAKE_BROKER_NAME_0).setEndpoints(fakePbEndpoints0()))
             .setId(0).build();
         messageQueueList.add(mq);
-        QueryRouteResponse resp = QueryRouteResponse.newBuilder().setStatus(status)
+        QueryRouteResponse response = QueryRouteResponse.newBuilder().setStatus(status)
             .addAllMessageQueues(messageQueueList).build();
-        final InvocationContext<QueryRouteResponse> invocationContext =
-            new InvocationContext<>(resp, fakeRpcContext());
-        future0.set(invocationContext);
+        final RpcInvocation<QueryRouteResponse> rpcInvocation =
+            new RpcInvocation<>(response, fakeRpcContext());
+        future0.set(rpcInvocation);
         when(clientManager.queryRoute(any(Endpoints.class), any(Metadata.class), any(QueryRouteRequest.class),
             any(Duration.class)))
             .thenReturn(future0);
@@ -147,11 +143,11 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
             any(Duration.class), any(ClientSessionImpl.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+        final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
             okSendMessageResponseFutureWithSingleEntry();
         when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get().getResp();
+        final SendMessageResponse response = future.get().getResponse();
         assertEquals(1, response.getEntriesCount());
         final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
         final SendReceipt sendReceipt = producer.send(message);
@@ -167,11 +163,11 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
             any(ClientSessionImpl.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
-        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+        final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
             okSendMessageResponseFutureWithSingleEntry();
         when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
-        final SendMessageResponse response = future.get().getResp();
+        final SendMessageResponse response = future.get().getResponse();
         assertEquals(1, response.getEntriesCount());
         final SendReceipt sendReceipt = producerWithoutTopicBinding.send(message);
         verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
@@ -190,7 +186,7 @@ public class ProducerImplTest extends TestBase {
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
             any(ClientSessionImpl.class));
-        final ListenableFuture<InvocationContext<SendMessageResponse>> future = failureSendMessageResponseFuture();
+        final ListenableFuture<RpcInvocation<SendMessageResponse>> future = failureSendMessageResponseFuture();
         when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);
         Message message0 = fakeMessage(FAKE_TOPIC_0);
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 3aa6d80..aba5ab7 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -32,7 +32,6 @@ import apache.rocketmq.v2.MessageQueue;
 import apache.rocketmq.v2.MessageType;
 import apache.rocketmq.v2.Permission;
 import apache.rocketmq.v2.QueryAssignmentResponse;
-import apache.rocketmq.v2.QueryRouteResponse;
 import apache.rocketmq.v2.ReceiveMessageResponse;
 import apache.rocketmq.v2.Resource;
 import apache.rocketmq.v2.SendMessageResponse;
@@ -68,14 +67,16 @@ import org.apache.rocketmq.client.java.impl.producer.SendReceiptImpl;
 import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
+import org.apache.rocketmq.client.java.misc.RequestIdGenerator;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.retry.CustomizedBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.rpc.InvocationContext;
-import org.apache.rocketmq.client.java.rpc.RpcContext;
+import org.apache.rocketmq.client.java.rpc.Context;
+import org.apache.rocketmq.client.java.rpc.RpcInvocation;
+import org.apache.rocketmq.client.java.rpc.Signature;
 
 public class TestBase {
     protected static final String FAKE_CLIENT_ID = "mbp@29848@cno0nhxy";
@@ -141,8 +142,11 @@ public class TestBase {
         return new Endpoints(fakePbEndpoints0());
     }
 
-    protected RpcContext fakeRpcContext() {
-        return new RpcContext(fakeEndpoints(), new Metadata());
+    protected Context fakeRpcContext() {
+        final Metadata metadata = new Metadata();
+        metadata.put(Metadata.Key.of(Signature.REQUEST_ID_KEY, Metadata.ASCII_STRING_MARSHALLER),
+            RequestIdGenerator.getInstance().next());
+        return new Context(fakeEndpoints(), metadata);
     }
 
     protected Message fakeMessage(String topic) {
@@ -215,35 +219,36 @@ public class TestBase {
             .setPermission(Permission.READ_WRITE).build();
     }
 
-    protected ListenableFuture<InvocationContext<QueryRouteResponse>> okQueryRouteResponseFuture() {
+    protected ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
+    okChangeInvisibleDurationCtxFuture() {
         Status status = Status.newBuilder().setCode(Code.OK).build();
-        final QueryRouteResponse resp =
-            QueryRouteResponse.newBuilder().setStatus(status).addMessageQueues(fakePbMessageQueue0()).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        final ChangeInvisibleDurationResponse response =
+            ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
-    protected ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
-        okChangeInvisibleDurationCtxFuture() {
-        Status status = Status.newBuilder().setCode(Code.OK).build();
-        final ChangeInvisibleDurationResponse resp =
+    protected ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>>
+    changInvisibleDurationCtxFuture(Code code) {
+        Status status = Status.newBuilder().setCode(code).build();
+        final ChangeInvisibleDurationResponse response =
             ChangeInvisibleDurationResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
-    protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> okQueryAssignmentResponseFuture() {
+    protected ListenableFuture<RpcInvocation<QueryAssignmentResponse>> okQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         Assignment assignment = Assignment.newBuilder().setMessageQueue(fakePbMessageQueue0()).build();
-        QueryAssignmentResponse resp = QueryAssignmentResponse.newBuilder().setStatus(status)
+        QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status)
             .addAssignments(assignment).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
-    protected ListenableFuture<InvocationContext<QueryAssignmentResponse>> okEmptyQueryAssignmentResponseFuture() {
+    protected ListenableFuture<RpcInvocation<QueryAssignmentResponse>> okEmptyQueryAssignmentResponseFuture() {
         final SettableFuture<QueryAssignmentResponse> future = SettableFuture.create();
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        final QueryAssignmentResponse resp = QueryAssignmentResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        final QueryAssignmentResponse response = QueryAssignmentResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
     protected Map<String, FilterExpression> createSubscriptionExpressions(String topic) {
@@ -253,44 +258,41 @@ public class TestBase {
         return map;
     }
 
-    protected ListenableFuture<InvocationContext<AckMessageResponse>> okAckMessageResponseFuture() {
-        final Status status = Status.newBuilder().setCode(Code.OK).build();
-        final AckMessageResponse resp = AckMessageResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+    protected ListenableFuture<RpcInvocation<AckMessageResponse>> ackMessageResponseFuture(Code code) {
+        final Status status = Status.newBuilder().setCode(code).build();
+        final AckMessageResponse response = AckMessageResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
-    protected ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>>
-        okChangeInvisibleDurationResponseFuture(String receiptHandle) {
+    protected ListenableFuture<RpcInvocation<AckMessageResponse>> okAckMessageResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        SettableFuture<ChangeInvisibleDurationResponse> future = SettableFuture.create();
-        ChangeInvisibleDurationResponse resp = ChangeInvisibleDurationResponse.newBuilder().setStatus(status)
-            .setReceiptHandle(receiptHandle).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        final AckMessageResponse response = AckMessageResponse.newBuilder().setStatus(status).build();
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
-    protected ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>>
+    protected ListenableFuture<RpcInvocation<ForwardMessageToDeadLetterQueueResponse>>
     okForwardMessageToDeadLetterQueueResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
-        final ForwardMessageToDeadLetterQueueResponse resp =
+        final ForwardMessageToDeadLetterQueueResponse response =
             ForwardMessageToDeadLetterQueueResponse.newBuilder().setStatus(status).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
-    protected ListenableFuture<InvocationContext<SendMessageResponse>> okSendMessageResponseFutureWithSingleEntry() {
+    protected ListenableFuture<RpcInvocation<SendMessageResponse>> okSendMessageResponseFutureWithSingleEntry() {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         final String messageId = MessageIdCodec.getInstance().nextMessageId().toString();
         SendResultEntry entry = SendResultEntry.newBuilder().setMessageId(messageId)
             .setTransactionId(FAKE_TRANSACTION_ID).setStatus(status).setOffset(1).build();
-        SendMessageResponse resp = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).addEntries(entry).build();
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
-    protected ListenableFuture<InvocationContext<SendMessageResponse>> failureSendMessageResponseFuture() {
+    protected ListenableFuture<RpcInvocation<SendMessageResponse>> failureSendMessageResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.FORBIDDEN).build();
         SendResultEntry sendResultEntry = SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
-        SendMessageResponse resp = SendMessageResponse.newBuilder().setStatus(status)
+        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status)
             .addEntries(sendResultEntry).build();
-        return Futures.immediateFuture(new InvocationContext<>(resp, fakeRpcContext()));
+        return Futures.immediateFuture(new RpcInvocation<>(response, fakeRpcContext()));
     }
 
     protected ListenableFuture<SendMessageResponse> okBatchSendMessageResponseFuture() {
@@ -320,7 +322,7 @@ public class TestBase {
             .setSystemProperties(systemProperties).build();
     }
 
-    protected ListenableFuture<InvocationContext<Iterator<ReceiveMessageResponse>>> okReceiveMessageResponsesFuture(
+    protected ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> okReceiveMessageResponsesFuture(
         String topic, int messageCount) {
         final Status status = Status.newBuilder().setCode(Code.OK).build();
         final apache.rocketmq.v2.Message message = fakePbMessage(topic);
@@ -331,7 +333,7 @@ public class TestBase {
             ReceiveMessageResponse messageResponse = ReceiveMessageResponse.newBuilder().setMessage(message).build();
             responses.add(messageResponse);
         }
-        return Futures.immediateFuture(new InvocationContext<>(responses.iterator(), fakeRpcContext()));
+        return Futures.immediateFuture(new RpcInvocation<>(responses.iterator(), fakeRpcContext()));
     }
 
     protected ListenableFuture<EndTransactionResponse> okEndTransactionResponseFuture() {
@@ -359,7 +361,7 @@ public class TestBase {
 
     protected SendReceiptImpl fakeSendReceiptImpl(
         MessageQueueImpl mq) throws ExecutionException, InterruptedException, ClientException {
-        final ListenableFuture<InvocationContext<SendMessageResponse>> future =
+        final ListenableFuture<RpcInvocation<SendMessageResponse>> future =
             okSendMessageResponseFutureWithSingleEntry();
         final List<SendReceiptImpl> receipts = SendReceiptImpl.processRespContext(mq, future.get());
         return receipts.iterator().next();
diff --git a/java/pom.xml b/java/pom.xml
index a412dba..6c39ad4 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -297,7 +297,7 @@
                                 <limit>
                                     <counter>LINE</counter>
                                     <value>COVEREDRATIO</value>
-                                    <minimum>0.50</minimum>
+                                    <minimum>0.40</minimum>
                                 </limit>
                             </limits>
                         </rule>