You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/13 11:36:06 UTC

[rocketmq-clients] 01/01: Refactor client telemetry

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 9c0e2fb438eba3fbad4197127ae4d25bbe1bd973
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Wed Jul 13 17:08:55 2022 +0800

    Refactor client telemetry
---
 .../java/exception/InternalErrorException.java     |   4 +
 .../apache/rocketmq/client/java/impl/Client.java   |   6 +-
 .../rocketmq/client/java/impl/ClientImpl.java      | 101 ++++++++++-----
 .../client/java/impl/ClientManagerImpl.java        |   8 +-
 .../client/java/impl/ClientManagerRegistry.java    |   4 +-
 ...elemetrySession.java => ClientSessionImpl.java} | 138 +++++++++------------
 .../java/impl/consumer/ProcessQueueImpl.java       |  36 +++---
 .../java/impl/producer/ClientSessionProcessor.java |  47 +++++++
 .../client/java/metrics/ClientMeterProvider.java   |   2 +-
 .../java/metrics/MessageMeterInterceptor.java      |  14 +--
 .../java/impl/consumer/PushConsumerImplTest.java   |   4 +-
 .../java/impl/consumer/SimpleConsumerImplTest.java |   4 +-
 .../java/impl/producer/ProducerImplTest.java       |  12 +-
 13 files changed, 225 insertions(+), 155 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index e1a44a4..f42d369 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -27,4 +27,8 @@ public class InternalErrorException extends ClientException {
     public InternalErrorException(int responseCode, String message) {
         super(responseCode, message);
     }
+
+    public InternalErrorException(Throwable cause) {
+        super(cause);
+    }
 }
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
index b106db3..b0ac5f6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/Client.java
@@ -25,7 +25,7 @@ public interface Client {
      *
      * @return unique client identifier.
      */
-    String getClientId();
+    String clientId();
 
     /**
      * Send heart beat to remote {@link Endpoints}.
@@ -33,9 +33,9 @@ public interface Client {
     void doHeartbeat();
 
     /**
-     * Voluntary announce settings to remote.
+     * Sync settings to remote.
      */
-    void telemeterSettings();
+    void syncSettings();
 
     /**
      * Do some stats for client.
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 3cf8a36..99b292d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -42,11 +42,13 @@ import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import com.google.errorprone.annotations.concurrent.GuardedBy;
 import io.grpc.Metadata;
+import io.grpc.stub.StreamObserver;
 import java.security.InvalidKeyException;
 import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -67,10 +69,12 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
 import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
 import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
 import org.apache.rocketmq.client.java.metrics.Metric;
@@ -86,7 +90,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public abstract class ClientImpl extends AbstractIdleService implements Client, MessageInterceptor {
+public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionProcessor,
+    MessageInterceptor {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientImpl.class);
     private static final Duration TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP = Duration.ofSeconds(3);
 
@@ -111,9 +116,9 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     private final Map<String /* topic */, Set<SettableFuture<TopicRouteDataResult>>> inflightRouteFutureTable;
     private final Lock inflightRouteFutureLock;
 
-    @GuardedBy("telemetrySessionsLock")
-    private final ConcurrentMap<Endpoints, TelemetrySession> telemetrySessionTable;
-    private final ReadWriteLock telemetrySessionsLock;
+    @GuardedBy("endpointsSessionsLock")
+    private final Map<Endpoints, ClientSessionImpl> endpointsSessionTable;
+    private final ReadWriteLock endpointsSessionsLock;
 
     @GuardedBy("messageInterceptorsLock")
     private final List<MessageInterceptor> messageInterceptors;
@@ -131,8 +136,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         this.inflightRouteFutureTable = new ConcurrentHashMap<>();
         this.inflightRouteFutureLock = new ReentrantLock();
 
-        this.telemetrySessionTable = new ConcurrentHashMap<>();
-        this.telemetrySessionsLock = new ReentrantReadWriteLock();
+        this.endpointsSessionTable = new HashMap<>();
+        this.endpointsSessionsLock = new ReentrantReadWriteLock();
 
         this.isolated = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -274,13 +279,39 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         }
     }
 
+    @Override
+    public TelemetryCommand getSettingsCommand() {
+        final Settings settings = this.getClientSettings().toProtobuf();
+        return TelemetryCommand.newBuilder().setSettings(settings).build();
+    }
+
+    @Override
+    public StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints,
+        StreamObserver<TelemetryCommand> observer) throws ClientException {
+        try {
+            final Metadata metadata = this.sign();
+            return clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), observer);
+        } catch (ClientException e) {
+            throw e;
+        } catch (Throwable t) {
+            throw new InternalErrorException(t);
+        }
+    }
+
+    @Override
+    public ListenableFuture<Void> register() {
+        return Futures.transformAsync(this.getClientSettings().arrivedFuture,
+            (clientSettings) -> Futures.immediateVoidFuture(), clientCallbackExecutor);
+    }
+
     /**
      * This method is invoked while request of printing thread stack trace is received from remote.
      *
      * @param endpoints remote endpoints.
      * @param command   request of printing thread stack trace from remote.
      */
-    void onPrintThreadStackCommand(Endpoints endpoints, PrintThreadStackTraceCommand command) {
+    @Override
+    public void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStackTraceCommand command) {
         final String nonce = command.getNonce();
         Runnable task = () -> {
             try {
@@ -314,6 +345,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param endpoints remote endpoints.
      * @param settings  settings received from remote.
      */
+    @Override
     public final void onSettingsCommand(Endpoints endpoints, Settings settings) {
         final Metric metric = new Metric(settings.getMetric());
         clientMeterProvider.reset(metric);
@@ -322,10 +354,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     }
 
     /**
-     * @see Client#telemeterSettings()
+     * @see Client#syncSettings()
      */
     @Override
-    public void telemeterSettings() {
+    public void syncSettings() {
         final Settings settings = getClientSettings().toProtobuf();
         final TelemetryCommand command = TelemetryCommand.newBuilder().setSettings(settings).build();
         final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
@@ -345,12 +377,12 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param command   command to telemeter.
      */
     public void telemeter(Endpoints endpoints, TelemetryCommand command) {
-        final ListenableFuture<TelemetrySession> future = registerTelemetrySession(endpoints);
-        Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
+        final ListenableFuture<ClientSessionImpl> future = registerTelemetrySession(endpoints);
+        Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
             @Override
-            public void onSuccess(TelemetrySession session) {
+            public void onSuccess(ClientSessionImpl session) {
                 try {
-                    session.telemeter(command);
+                    session.publish(command);
                 } catch (Throwable t) {
                     LOGGER.error("Failed to telemeter command, endpoints={}, command={}", endpoints, command);
                 }
@@ -364,43 +396,44 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     }
 
     private void releaseTelemetrySessions() {
-        telemetrySessionsLock.readLock().lock();
+        endpointsSessionsLock.readLock().lock();
         try {
-            telemetrySessionTable.values().forEach(TelemetrySession::release);
+            endpointsSessionTable.values().forEach(ClientSessionImpl::release);
         } finally {
-            telemetrySessionsLock.readLock().unlock();
+            endpointsSessionsLock.readLock().unlock();
         }
     }
 
     /**
      * Try to register telemetry session, return it directly if session is existed already.
      */
-    public ListenableFuture<TelemetrySession> registerTelemetrySession(Endpoints endpoints) {
-        final SettableFuture<TelemetrySession> future0 = SettableFuture.create();
-        telemetrySessionsLock.readLock().lock();
+    public ListenableFuture<ClientSessionImpl> registerTelemetrySession(Endpoints endpoints) {
+        final SettableFuture<ClientSessionImpl> future0 = SettableFuture.create();
+        endpointsSessionsLock.readLock().lock();
         try {
-            TelemetrySession telemetrySession = telemetrySessionTable.get(endpoints);
+            ClientSessionImpl clientSessionImpl = endpointsSessionTable.get(endpoints);
             // Return is directly if session is existed already.
-            if (null != telemetrySession) {
-                future0.set(telemetrySession);
+            if (null != clientSessionImpl) {
+                future0.set(clientSessionImpl);
                 return future0;
             }
         } finally {
-            telemetrySessionsLock.readLock().unlock();
+            endpointsSessionsLock.readLock().unlock();
         }
         // Future's exception has been logged during the registration.
-        final ListenableFuture<TelemetrySession> future = TelemetrySession.register(this, clientManager, endpoints);
+        final ListenableFuture<ClientSessionImpl> future = new ClientSessionImpl(this, endpoints).register();
         return Futures.transform(future, session -> {
-            telemetrySessionsLock.writeLock().lock();
+            endpointsSessionsLock.writeLock().lock();
             try {
-                TelemetrySession existed = telemetrySessionTable.get(endpoints);
+                ClientSessionImpl existed = endpointsSessionTable.get(endpoints);
                 if (null != existed) {
+                    session.release();
                     return existed;
                 }
-                telemetrySessionTable.put(endpoints, session);
+                endpointsSessionTable.put(endpoints, session);
                 return session;
             } finally {
-                telemetrySessionsLock.writeLock().unlock();
+                endpointsSessionsLock.writeLock().unlock();
             }
         }, MoreExecutors.directExecutor());
     }
@@ -412,7 +445,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      */
     public ListenableFuture<Void> onTopicRouteDataResultFetched(String topic,
         TopicRouteDataResult topicRouteDataResult) {
-        final ListenableFuture<List<TelemetrySession>> future =
+        final ListenableFuture<List<ClientSessionImpl>> future =
             Futures.allAsList(topicRouteDataResult.getTopicRouteData()
                 .getMessageQueues().stream()
                 .map(mq -> mq.getBroker().getEndpoints())
@@ -420,9 +453,9 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                 .stream().map(this::registerTelemetrySession)
                 .collect(Collectors.toList()));
         SettableFuture<Void> future0 = SettableFuture.create();
-        Futures.addCallback(future, new FutureCallback<List<TelemetrySession>>() {
+        Futures.addCallback(future, new FutureCallback<List<ClientSessionImpl>>() {
             @Override
-            public void onSuccess(List<TelemetrySession> sessions) {
+            public void onSuccess(List<ClientSessionImpl> sessions) {
                 LOGGER.info("Register session successfully, current route will be cached, topic={}, "
                     + "topicRouteDataResult={}", topic, topicRouteDataResult);
                 final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
@@ -459,6 +492,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param endpoints remote endpoints.
      * @param command   request of message consume verification from remote.
      */
+    @Override
     public void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command) {
         LOGGER.warn("Ignore verify message command from remote, which is not expected, clientId={}, command={}",
             clientId, command);
@@ -482,6 +516,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      * @param endpoints remote endpoints.
      * @param command   request of orphaned transaction recovery from remote.
      */
+    @Override
     public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
         LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, "
             + "command={}", clientId, command);
@@ -532,10 +567,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     }
 
     /**
-     * @see Client#getClientId()
+     * @see Client#clientId()
      */
     @Override
-    public String getClientId() {
+    public String clientId() {
         return clientId;
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
index 337edcc..b564634 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerImpl.java
@@ -131,12 +131,12 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
 
     @Override
     public void registerClient(Client client) {
-        clientTable.put(client.getClientId(), client);
+        clientTable.put(client.clientId(), client);
     }
 
     @Override
     public void unregisterClient(Client client) {
-        clientTable.remove(client.getClientId());
+        clientTable.remove(client.clientId());
     }
 
     @Override
@@ -189,9 +189,9 @@ public class ClientManagerImpl extends AbstractIdleService implements ClientMana
     private void syncSettings() {
         clientTable.values().forEach(client -> {
             try {
-                client.telemeterSettings();
+                client.syncSettings();
             } catch (Throwable t) {
-                LOGGER.error("Failed to announce settings, clientId={}", client.getClientId(), t);
+                LOGGER.error("Failed to announce settings, clientId={}", client.clientId(), t);
             }
         });
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
index 42dc30d..80299da 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientManagerRegistry.java
@@ -57,7 +57,7 @@ public class ClientManagerRegistry {
                 clientManager.startAsync().awaitRunning();
                 singletonClientManager = clientManager;
             }
-            clientIds.add(client.getClientId());
+            clientIds.add(client.clientId());
             singletonClientManager.registerClient(client);
             return singletonClientManager;
         } finally {
@@ -77,7 +77,7 @@ public class ClientManagerRegistry {
         ClientManagerImpl clientManager = null;
         clientIdsLock.lock();
         try {
-            clientIds.remove(client.getClientId());
+            clientIds.remove(client.clientId());
             singletonClientManager.unregisterClient(client);
             if (clientIds.isEmpty()) {
                 clientManager = singletonClientManager;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
similarity index 54%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index 36d74a5..d6a19c2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/TelemetrySession.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -26,14 +26,11 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Metadata;
 import io.grpc.stub.StreamObserver;
-import java.io.UnsupportedEncodingException;
-import java.security.InvalidKeyException;
-import java.security.NoSuchAlgorithmException;
-import java.time.Duration;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,51 +39,39 @@ import org.slf4j.LoggerFactory;
  * Telemetry session is constructed before first communication between client and remote route endpoints.
  */
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public class TelemetrySession implements StreamObserver<TelemetryCommand> {
-    private static final Logger LOGGER = LoggerFactory.getLogger(TelemetrySession.class);
+public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionImpl.class);
 
-    private final ClientImpl client;
-    private final ClientManager clientManager;
+    private final ClientSessionProcessor processor;
     private final Endpoints endpoints;
-    private volatile StreamObserver<TelemetryCommand> requestObserver;
+    private final ReadWriteLock observerLock;
+    private StreamObserver<TelemetryCommand> requestObserver = null;
 
-    private TelemetrySession(ClientImpl client, ClientManager clientManager, Endpoints endpoints) {
-        this.client = client;
-        this.clientManager = clientManager;
+    protected ClientSessionImpl(ClientSessionProcessor processor, Endpoints endpoints) {
+        this.processor = processor;
         this.endpoints = endpoints;
+        this.observerLock = new ReentrantReadWriteLock();
     }
 
-    public static ListenableFuture<TelemetrySession> register(ClientImpl client, ClientManager clientManager,
-        Endpoints endpoints) {
-        return new TelemetrySession(client, clientManager, endpoints).register();
-    }
-
-    private ListenableFuture<TelemetrySession> register() {
-        ListenableFuture<TelemetrySession> future;
+    protected ListenableFuture<ClientSessionImpl> register() {
+        ListenableFuture<ClientSessionImpl> future;
         try {
-            this.init();
-            final ClientSettings clientSettings = client.getClientSettings();
-            final Settings settings = clientSettings.toProtobuf();
-            final TelemetryCommand settingsCommand = TelemetryCommand.newBuilder().setSettings(settings).build();
-            this.telemeter(settingsCommand);
-            future = Futures.transform(clientSettings.getArrivedFuture(), input -> this,
-                MoreExecutors.directExecutor());
+            final TelemetryCommand command = processor.getSettingsCommand();
+            this.publish(command);
+            future = Futures.transform(processor.register(), input -> this, MoreExecutors.directExecutor());
         } catch (Throwable t) {
-            SettableFuture<TelemetrySession> future0 = SettableFuture.create();
-            future0.setException(t);
-            future = future0;
+            future = Futures.immediateFailedFuture(t);
         }
-        Futures.addCallback(future, new FutureCallback<TelemetrySession>() {
+        final String clientId = processor.clientId();
+        Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
             @Override
-            public void onSuccess(TelemetrySession session) {
-                LOGGER.info("Register telemetry session successfully, endpoints={}, clientId={}", endpoints,
-                    client.getClientId());
+            public void onSuccess(ClientSessionImpl session) {
+                LOGGER.info("Register client session successfully, endpoints={}, clientId={}", endpoints, clientId);
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOGGER.error("Failed to register telemetry session, endpoints={}, clientId={}", endpoints,
-                    client.getClientId(), t);
+                LOGGER.error("Failed to register client session, endpoints={}, clientId={}", endpoints, clientId, t);
                 release();
             }
         }, MoreExecutors.directExecutor());
@@ -96,31 +81,19 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
     /**
      * Release telemetry session.
      */
-    public synchronized void release() {
+    public void release() {
+        this.observerLock.writeLock().lock();
         try {
             if (null != requestObserver) {
-                requestObserver.onCompleted();
+                try {
+                    requestObserver.onCompleted();
+                } catch (Throwable ignore) {
+                    // Ignore exception on purpose.
+                }
+                requestObserver = null;
             }
-        } catch (Throwable ignore) {
-            // Ignore exception on purpose.
-        }
-    }
-
-    /**
-     * Initialize telemetry session.
-     */
-    private synchronized void init() throws UnsupportedEncodingException, NoSuchAlgorithmException,
-        InvalidKeyException, ClientException {
-        this.release();
-        final Metadata metadata = client.sign();
-        this.requestObserver = clientManager.telemetry(endpoints, metadata, Duration.ofNanos(Long.MAX_VALUE), this);
-    }
-
-    private void reinit() {
-        try {
-            init();
-        } catch (Throwable ignore) {
-            // Ignore exception on purpose.
+        } finally {
+            this.observerLock.writeLock().unlock();
         }
     }
 
@@ -129,13 +102,24 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
      *
      * @param command appointed command to telemeter
      */
-    public void telemeter(TelemetryCommand command) {
+    public void publish(TelemetryCommand command) throws ClientException {
+        this.observerLock.readLock().lock();
         try {
+            if (null != requestObserver) {
+                requestObserver.onNext(command);
+                return;
+            }
+        } finally {
+            this.observerLock.readLock().unlock();
+        }
+        this.observerLock.writeLock().lock();
+        try {
+            if (null == requestObserver) {
+                this.requestObserver = processor.telemetry(endpoints, this);
+            }
             requestObserver.onNext(command);
-        } catch (RuntimeException e) {
-            // Cancel RPC.
-            requestObserver.onError(e);
-            throw e;
+        } finally {
+            this.observerLock.writeLock().unlock();
         }
     }
 
@@ -146,52 +130,52 @@ public class TelemetrySession implements StreamObserver<TelemetryCommand> {
                 case SETTINGS: {
                     final Settings settings = command.getSettings();
                     LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints,
-                        client.getClientId());
-                    client.onSettingsCommand(endpoints, settings);
+                        processor.clientId());
+                    processor.onSettingsCommand(endpoints, settings);
                     break;
                 }
                 case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
                     final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand =
                         command.getRecoverOrphanedTransactionCommand();
                     LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, "
-                        + "clientId={}", endpoints, client.getClientId());
-                    client.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
+                        + "clientId={}", endpoints, processor.clientId());
+                    processor.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
                     break;
                 }
                 case VERIFY_MESSAGE_COMMAND: {
                     final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand();
                     LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}",
-                        endpoints, client.getClientId());
-                    client.onVerifyMessageCommand(endpoints, verifyMessageCommand);
+                        endpoints, processor.clientId());
+                    processor.onVerifyMessageCommand(endpoints, verifyMessageCommand);
                     break;
                 }
                 case PRINT_THREAD_STACK_TRACE_COMMAND: {
                     final PrintThreadStackTraceCommand printThreadStackTraceCommand =
                         command.getPrintThreadStackTraceCommand();
                     LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}",
-                        endpoints, client.getClientId());
-                    client.onPrintThreadStackCommand(endpoints, printThreadStackTraceCommand);
+                        endpoints, processor.clientId());
+                    processor.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand);
                     break;
                 }
                 default:
                     LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}",
-                        endpoints, command, client.getClientId());
+                        endpoints, command, processor.clientId());
             }
         } catch (Throwable t) {
             LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, "
-                + "clientId={}", command, client.getClientId(), t);
+                + "clientId={}", command, processor.clientId(), t);
         }
     }
 
     @Override
     public void onError(Throwable throwable) {
         LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}",
-            client.getClientId(), endpoints, throwable);
-        reinit();
+            processor.clientId(), endpoints, throwable);
+        this.release();
     }
 
     @Override
     public void onCompleted() {
-        reinit();
+        this.release();
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index c914dce..bdc6a6d 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -128,7 +128,7 @@ class ProcessQueueImpl implements ProcessQueue {
             return false;
         }
         LOGGER.warn("Process queue is idle, idleDuration={}, maxIdleDuration={}, mq={}, clientId={}", idleDuration,
-            maxIdleDuration, mq, consumer.getClientId());
+            maxIdleDuration, mq, consumer.clientId());
         return true;
     }
 
@@ -157,12 +157,12 @@ class ProcessQueueImpl implements ProcessQueue {
                 final MessageId messageId = messageView.getMessageId();
                 if (consumer.getPushConsumerSettings().isFifo()) {
                     LOGGER.error("Message is corrupted, forward it to dead letter queue in fifo mode, mq={}, " +
-                        "messageId={}, clientId={}", mq, messageId, consumer.getClientId());
+                        "messageId={}, clientId={}", mq, messageId, consumer.clientId());
                     forwardToDeadLetterQueue(messageView);
                     return;
                 }
                 LOGGER.error("Message is corrupted, nack it in standard mode, mq={}, messageId={}, clientId={}", mq,
-                    messageId, consumer.getClientId());
+                    messageId, consumer.clientId());
                 nackMessage(messageView);
             });
         }
@@ -194,7 +194,7 @@ class ProcessQueueImpl implements ProcessQueue {
             }
             // Should never reach here.
             LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq,
-                consumer.getClientId(), t);
+                consumer.clientId(), t);
             receiveMessageLater();
         }
     }
@@ -202,12 +202,12 @@ class ProcessQueueImpl implements ProcessQueue {
     public void receiveMessage() {
         if (dropped) {
             LOGGER.info("Process queue has been dropped, no longer receive message, mq={}, clientId={}", mq,
-                consumer.getClientId());
+                consumer.clientId());
             return;
         }
         if (this.isCacheFull()) {
             LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq,
-                consumer.getClientId());
+                consumer.clientId());
             receiveMessageLater();
             return;
         }
@@ -217,7 +217,7 @@ class ProcessQueueImpl implements ProcessQueue {
     private void receiveMessageImmediately() {
         if (!consumer.isRunning()) {
             LOGGER.info("Stop to receive message because consumer is not running, mq={}, clientId={}", mq,
-                consumer.getClientId());
+                consumer.clientId());
             return;
         }
         try {
@@ -251,7 +251,7 @@ class ProcessQueueImpl implements ProcessQueue {
                         // Should never reach here.
                         LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," +
                                 " mq={}, endpoints={}, clientId={}",
-                            mq, endpoints, consumer.getClientId(), t);
+                            mq, endpoints, consumer.clientId(), t);
                         receiveMessageLater();
                     }
                 }
@@ -264,14 +264,14 @@ class ProcessQueueImpl implements ProcessQueue {
                         MessageHookPointsStatus.ERROR);
 
                     LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," +
-                        " clientId={}", mq, endpoints, consumer.getClientId(), t);
+                        " clientId={}", mq, endpoints, consumer.clientId(), t);
                     receiveMessageLater();
                 }
             }, MoreExecutors.directExecutor());
             consumer.getReceptionTimes().getAndIncrement();
         } catch (Throwable t) {
             LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq,
-                consumer.getClientId(), t);
+                consumer.clientId(), t);
             receiveMessageLater();
         }
     }
@@ -282,7 +282,7 @@ class ProcessQueueImpl implements ProcessQueue {
         if (cacheMessageCountThresholdPerQueue <= actualMessagesQuantity) {
             LOGGER.warn("Process queue total cached messages quantity exceeds the threshold, threshold={}, actual={}," +
                     " mq={}, clientId={}",
-                cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.getClientId());
+                cacheMessageCountThresholdPerQueue, actualMessagesQuantity, mq, consumer.clientId());
             return true;
         }
         final int cacheMessageBytesThresholdPerQueue = consumer.cacheMessageBytesThresholdPerQueue();
@@ -290,7 +290,7 @@ class ProcessQueueImpl implements ProcessQueue {
         if (cacheMessageBytesThresholdPerQueue <= actualCachedMessagesBytes) {
             LOGGER.warn("Process queue total cached messages memory exceeds the threshold, threshold={} bytes," +
                     " actual={} bytes, mq={}, clientId={}",
-                cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.getClientId());
+                cacheMessageBytesThresholdPerQueue, actualCachedMessagesBytes, mq, consumer.clientId());
             return true;
         }
         return false;
@@ -384,7 +384,7 @@ class ProcessQueueImpl implements ProcessQueue {
     }
 
     private void ackMessage(MessageViewImpl messageView) {
-        final String clientId = consumer.getClientId();
+        final String clientId = consumer.clientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
@@ -450,7 +450,7 @@ class ProcessQueueImpl implements ProcessQueue {
         int attempt = messageView.getDeliveryAttempt();
         final MessageId messageId = messageView.getMessageId();
         final ConsumeService service = consumer.getConsumeService();
-        final String clientId = consumer.getClientId();
+        final String clientId = consumer.clientId();
         if (ConsumeResult.FAILURE.equals(consumeResult) && attempt < maxAttempts) {
             final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
             attempt = messageView.incrementAndGetDeliveryAttempt();
@@ -482,7 +482,7 @@ class ProcessQueueImpl implements ProcessQueue {
         final SettableFuture<Void> future0) {
         final ListenableFuture<InvocationContext<ForwardMessageToDeadLetterQueueResponse>> future =
             consumer.forwardMessageToDeadLetterQueue(messageView);
-        final String clientId = consumer.getClientId();
+        final String clientId = consumer.clientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
@@ -528,7 +528,7 @@ class ProcessQueueImpl implements ProcessQueue {
     private void forwardToDeadLetterQueueLater(final MessageViewImpl messageView, final int attempt,
         final SettableFuture<Void> future0) {
         final MessageId messageId = messageView.getMessageId();
-        final String clientId = consumer.getClientId();
+        final String clientId = consumer.clientId();
         // Process queue is dropped, no need to proceed.
         if (dropped) {
             LOGGER.info("Process queue was dropped, give up to forward message to dead letter queue, mq={}," +
@@ -558,7 +558,7 @@ class ProcessQueueImpl implements ProcessQueue {
 
     private void ackFifoMessage(final MessageViewImpl messageView, final int attempt,
         final SettableFuture<Void> future0) {
-        final String clientId = consumer.getClientId();
+        final String clientId = consumer.clientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
@@ -606,7 +606,7 @@ class ProcessQueueImpl implements ProcessQueue {
     private void ackFifoMessageLater(final MessageViewImpl messageView, final int attempt,
         final SettableFuture<Void> future0) {
         final MessageId messageId = messageView.getMessageId();
-        final String clientId = consumer.getClientId();
+        final String clientId = consumer.clientId();
         // Process queue is dropped, no need to proceed.
         if (dropped) {
             LOGGER.info("Process queue was dropped, give up to ack message, mq={}, messageId={}, clientId={}",
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
new file mode 100644
index 0000000..8bdc019
--- /dev/null
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.java.impl.producer;
+
+import apache.rocketmq.v2.PrintThreadStackTraceCommand;
+import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
+import apache.rocketmq.v2.Settings;
+import apache.rocketmq.v2.TelemetryCommand;
+import apache.rocketmq.v2.VerifyMessageCommand;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.grpc.stub.StreamObserver;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.route.Endpoints;
+
+public interface ClientSessionProcessor {
+    ListenableFuture<Void> register();
+
+    String clientId();
+
+    TelemetryCommand getSettingsCommand();
+
+    StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, StreamObserver<TelemetryCommand> observer)
+        throws ClientException;
+
+    void onSettingsCommand(Endpoints endpoints, Settings settings);
+
+    void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command);
+
+    void onVerifyMessageCommand(Endpoints endpoints, VerifyMessageCommand command);
+
+    void onPrintThreadStackTraceCommand(Endpoints endpoints, PrintThreadStackTraceCommand command);
+}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
index 9451028..3547a96 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/ClientMeterProvider.java
@@ -73,7 +73,7 @@ public class ClientMeterProvider {
     }
 
     public synchronized void reset(Metric metric) {
-        final String clientId = client.getClientId();
+        final String clientId = client.clientId();
         try {
             if (clientMeter.satisfy(metric)) {
                 LOGGER.debug("Metric settings is satisfied by the current message meter, clientId={}", clientId);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
index d9bd316..6cfb776 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/metrics/MessageMeterInterceptor.java
@@ -55,7 +55,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
             InvocationStatus invocationStatus = MessageHookPointsStatus.OK.equals(status) ? InvocationStatus.SUCCESS :
                 InvocationStatus.FAILURE;
             Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
-                .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().getClientId())
+                .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().clientId())
                 .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName()).build();
             histogram.record(duration.toMillis(), attributes);
         }
@@ -74,7 +74,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
             consumerGroup = ((SimpleConsumer) client).getConsumerGroup();
         }
         if (null == consumerGroup) {
-            LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.getClientId());
+            LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.clientId());
             return;
         }
         final MessageCommon messageCommon = messageCommons.iterator().next();
@@ -92,7 +92,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
         final DoubleHistogram histogram = optionalHistogram.get();
         final Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
             .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-            .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+            .put(MetricLabels.CLIENT_ID, client.clientId()).build();
         histogram.record(latency, attributes);
     }
 
@@ -103,7 +103,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
             consumerGroup = ((PushConsumer) client).getConsumerGroup();
         }
         if (null == consumerGroup) {
-            LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.getClientId());
+            LOGGER.error("[Bug] consumerGroup is not recognized, clientId={}", client.clientId());
             return;
         }
         final MessageCommon messageCommon = messageCommons.iterator().next();
@@ -114,7 +114,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
         final Duration durationAfterDecoding = optionalDurationAfterDecoding.get();
         Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
             .put(MetricLabels.CONSUMER_GROUP, consumerGroup)
-            .put(MetricLabels.CLIENT_ID, client.getClientId()).build();
+            .put(MetricLabels.CLIENT_ID, client.clientId()).build();
         final Optional<DoubleHistogram> optionalHistogram =
             clientMeterProvider.getHistogramByName(MetricName.AWAIT_TIME);
         if (!optionalHistogram.isPresent()) {
@@ -129,7 +129,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
         final ClientImpl client = clientMeterProvider.getClient();
         if (!(client instanceof PushConsumer)) {
             // Should never reach here.
-            LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.getClientId());
+            LOGGER.error("[Bug] current client is not push consumer, clientId={}", client.clientId());
             return;
         }
         PushConsumer pushConsumer = (PushConsumer) client;
@@ -138,7 +138,7 @@ public class MessageMeterInterceptor implements MessageInterceptor {
                 InvocationStatus.FAILURE;
             Attributes attributes = Attributes.builder().put(MetricLabels.TOPIC, messageCommon.getTopic())
                 .put(MetricLabels.CONSUMER_GROUP, pushConsumer.getConsumerGroup())
-                .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().getClientId())
+                .put(MetricLabels.CLIENT_ID, clientMeterProvider.getClient().clientId())
                 .put(MetricLabels.INVOCATION_STATUS, invocationStatus.getName())
                 .build();
             final Optional<DoubleHistogram> optionalHistogram =
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
index a7b5357..8b88702 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImplTest.java
@@ -46,7 +46,7 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.MessageListener;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.tool.TestBase;
@@ -87,7 +87,7 @@ public class PushConsumerImplTest extends TestBase {
             any(Duration.class)))
             .thenReturn(okQueryRouteResponseFuture());
         when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
-            any(TelemetrySession.class)))
+            any(ClientSessionImpl.class)))
             .thenReturn(telemetryRequestObserver);
         final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
             "TestScheduler"));
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
index 88445d1..e6ede4e 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImplTest.java
@@ -59,7 +59,7 @@ import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
@@ -105,7 +105,7 @@ public class SimpleConsumerImplTest extends TestBase {
             any(Duration.class)))
             .thenReturn(future0);
         when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
-            any(TelemetrySession.class)))
+            any(ClientSessionImpl.class)))
             .thenReturn(telemetryRequestObserver);
         final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
             new ThreadFactoryImpl("TestScheduler"));
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
index 619538e..f069b52 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/producer/ProducerImplTest.java
@@ -58,7 +58,7 @@ import org.apache.rocketmq.client.apis.message.Message;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.java.impl.ClientManagerImpl;
 import org.apache.rocketmq.client.java.impl.ClientManagerRegistry;
-import org.apache.rocketmq.client.java.impl.TelemetrySession;
+import org.apache.rocketmq.client.java.impl.ClientSessionImpl;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.rpc.InvocationContext;
@@ -112,7 +112,7 @@ public class ProducerImplTest extends TestBase {
             any(Duration.class)))
             .thenReturn(future0);
         when(clientManager.telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
-            any(TelemetrySession.class)))
+            any(ClientSessionImpl.class)))
             .thenReturn(telemetryRequestObserver);
         final ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1, new ThreadFactoryImpl(
             "TestScheduler"));
@@ -145,7 +145,7 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
-            any(Duration.class), any(TelemetrySession.class));
+            any(Duration.class), any(ClientSessionImpl.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
         final ListenableFuture<InvocationContext<SendMessageResponse>> future =
             okSendMessageResponseFutureWithSingleEntry();
@@ -165,7 +165,7 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, never()).queryRoute(any(Endpoints.class), any(Metadata.class),
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, never()).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
-            any(TelemetrySession.class));
+            any(ClientSessionImpl.class));
         final Message message = fakeMessage(FAKE_TOPIC_0);
         final ListenableFuture<InvocationContext<SendMessageResponse>> future =
             okSendMessageResponseFutureWithSingleEntry();
@@ -177,7 +177,7 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class),
-            any(Duration.class), any(TelemetrySession.class));
+            any(Duration.class), any(ClientSessionImpl.class));
         final apache.rocketmq.v2.SendResultEntry receipt = response.getEntriesList().iterator().next();
         assertEquals(receipt.getMessageId(), sendReceipt.getMessageId().toString());
         shutdown(producerWithoutTopicBinding);
@@ -189,7 +189,7 @@ public class ProducerImplTest extends TestBase {
         verify(clientManager, times(1)).queryRoute(any(Endpoints.class), any(Metadata.class),
             any(QueryRouteRequest.class), any(Duration.class));
         verify(clientManager, times(1)).telemetry(any(Endpoints.class), any(Metadata.class), any(Duration.class),
-            any(TelemetrySession.class));
+            any(ClientSessionImpl.class));
         final ListenableFuture<InvocationContext<SendMessageResponse>> future = failureSendMessageResponseFuture();
         when(clientManager.sendMessage(any(Endpoints.class), any(Metadata.class), any(SendMessageRequest.class),
             any(Duration.class))).thenReturn(future);