You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/28 09:09:07 UTC

[rocketmq-clients] 01/01: Polish code

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit af6500d98f0568c4f6976d724dc83be57d367b6d
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Jul 28 14:01:22 2022 +0800

    Polish code
---
 .../rocketmq/client/java/impl/ClientImpl.java      | 346 ++++++++++-----------
 .../client/java/impl/ClientSessionImpl.java        | 137 ++++----
 .../java/impl/consumer/ProcessQueueImpl.java       |  12 +-
 .../java/impl/consumer/PushConsumerImpl.java       |  15 +-
 .../java/impl/consumer/SimpleConsumerImpl.java     |  13 +-
 .../impl/consumer/SubscriptionLoadBalancer.java    |  37 +--
 ...ionProcessor.java => ClientSessionHandler.java} |  14 +-
 .../client/java/impl/producer/ProducerImpl.java    |  29 +-
 .../java/impl/producer/PublishingLoadBalancer.java |  42 +--
 .../client/java/route/TopicRouteDataResult.java    | 125 --------
 10 files changed, 289 insertions(+), 481 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index cc93fe9..bc2f68f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.HeartbeatRequest;
 import apache.rocketmq.v2.HeartbeatResponse;
+import apache.rocketmq.v2.MessageQueue;
 import apache.rocketmq.v2.NotifyClientTerminationRequest;
 import apache.rocketmq.v2.PrintThreadStackTraceCommand;
 import apache.rocketmq.v2.QueryRouteRequest;
@@ -69,12 +70,16 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
 import org.apache.rocketmq.client.java.exception.InternalErrorException;
 import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
-import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.message.MessageCommon;
 import org.apache.rocketmq.client.java.metrics.ClientMeterProvider;
 import org.apache.rocketmq.client.java.metrics.Metric;
@@ -82,18 +87,16 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
 import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.apache.rocketmq.client.java.rpc.Signature;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
-public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionProcessor,
+public abstract class ClientImpl extends AbstractIdleService implements Client, ClientSessionHandler,
     MessageInterceptor {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientImpl.class);
-    private static final Duration TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP = Duration.ofSeconds(3);
-
     private static final Duration TELEMETRY_TIMEOUT = Duration.ofDays(102 * 365);
 
     protected final ClientManager clientManager;
@@ -111,15 +114,15 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     protected final String clientId;
 
     private volatile ScheduledFuture<?> updateRouteCacheFuture;
-    private final ConcurrentMap<String, TopicRouteDataResult> topicRouteResultCache;
+    private final ConcurrentMap<String, TopicRouteData> topicRouteCache;
 
     @GuardedBy("inflightRouteFutureLock")
-    private final Map<String /* topic */, Set<SettableFuture<TopicRouteDataResult>>> inflightRouteFutureTable;
+    private final Map<String /* topic */, Set<SettableFuture<TopicRouteData>>> inflightRouteFutureTable;
     private final Lock inflightRouteFutureLock;
 
-    @GuardedBy("endpointsSessionsLock")
-    private final Map<Endpoints, ClientSessionImpl> endpointsSessionTable;
-    private final ReadWriteLock endpointsSessionsLock;
+    @GuardedBy("sessionsLock")
+    private final Map<Endpoints, ClientSessionImpl> sessionsTable;
+    private final ReadWriteLock sessionsLock;
 
     @GuardedBy("messageInterceptorsLock")
     private final List<MessageInterceptor> messageInterceptors;
@@ -132,13 +135,13 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         // Generate client id firstly.
         this.clientId = Utilities.genClientId();
 
-        this.topicRouteResultCache = new ConcurrentHashMap<>();
+        this.topicRouteCache = new ConcurrentHashMap<>();
 
         this.inflightRouteFutureTable = new ConcurrentHashMap<>();
         this.inflightRouteFutureLock = new ReentrantLock();
 
-        this.endpointsSessionTable = new HashMap<>();
-        this.endpointsSessionsLock = new ReentrantReadWriteLock();
+        this.sessionsTable = new HashMap<>();
+        this.sessionsLock = new ReentrantReadWriteLock();
 
         this.isolated = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
@@ -180,21 +183,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         // Fetch topic route from remote.
         LOGGER.info("Begin to fetch topic(s) route data from remote during client startup, clientId={}, topics={}",
             clientId, topics);
-        // Aggregate all topic route data futures into a composited future.
-        final List<ListenableFuture<TopicRouteDataResult>> futures = topics.stream()
-            .map(this::getRouteDataResult)
-            .collect(Collectors.toList());
-        List<TopicRouteDataResult> results;
-        try {
-            results = Futures.allAsList(futures).get(TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP.toNanos(),
-                TimeUnit.NANOSECONDS);
-        } catch (Throwable t) {
-            LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, "
-                + "topics={}", clientId, topics, t);
-            throw new NotFoundException(t);
-        }
-        for (TopicRouteDataResult result : results) {
-            result.checkAndGetTopicRouteData();
+        final List<ListenableFuture<TopicRouteData>> futures =
+            topics.stream().map(this::fetchTopicRoute).collect(Collectors.toList());
+        for (ListenableFuture<TopicRouteData> future : futures) {
+            future.get();
         }
         LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
             clientId, topics);
@@ -282,7 +274,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     }
 
     @Override
-    public TelemetryCommand getSettingsCommand() {
+    public TelemetryCommand settingsCommand() {
         final Settings settings = this.getClientSettings().toProtobuf();
         return TelemetryCommand.newBuilder().setSettings(settings).build();
     }
@@ -301,7 +293,13 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     }
 
     @Override
-    public ListenableFuture<Void> register() {
+    public boolean isEndpointsDeprecated(Endpoints endpoints) {
+        final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
+        return totalRouteEndpoints.contains(endpoints);
+    }
+
+    @Override
+    public ListenableFuture<Void> awaitSettingSynchronized() {
         return Futures.transformAsync(this.getClientSettings().arrivedFuture,
             (clientSettings) -> Futures.immediateVoidFuture(), clientCallbackExecutor);
     }
@@ -325,7 +323,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     .setThreadStackTrace(threadStackTrace)
                     .setStatus(status)
                     .build();
-                telemeter(endpoints, telemetryCommand);
+                telemetry(endpoints, telemetryCommand);
             } catch (Throwable t) {
                 LOGGER.error("Failed to send thread stack trace to remote, endpoints={}, nonce={}, clientId={}",
                     endpoints, nonce, clientId, t);
@@ -351,7 +349,6 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     public final void onSettingsCommand(Endpoints endpoints, Settings settings) {
         final Metric metric = new Metric(settings.getMetric());
         clientMeterProvider.reset(metric);
-        LOGGER.info("Receive settings from remote, endpoints={}", endpoints);
         this.getClientSettings().applySettingsCommand(settings);
     }
 
@@ -365,127 +362,81 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         final Set<Endpoints> totalRouteEndpoints = getTotalRouteEndpoints();
         for (Endpoints endpoints : totalRouteEndpoints) {
             try {
-                telemeter(endpoints, command);
+                telemetry(endpoints, command);
             } catch (Throwable t) {
                 LOGGER.error("Failed to telemeter settings, clientId={}, endpoints={}", clientId, endpoints, t);
             }
         }
     }
 
-    /**
-     * Telemeter command to remote endpoints.
-     *
-     * @param endpoints remote endpoints to telemeter.
-     * @param command   command to telemeter.
-     */
-    public void telemeter(Endpoints endpoints, TelemetryCommand command) {
-        final ListenableFuture<ClientSessionImpl> future = registerTelemetrySession(endpoints);
-        Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
-            @Override
-            public void onSuccess(ClientSessionImpl session) {
-                try {
-                    session.publish(command);
-                } catch (Throwable t) {
-                    LOGGER.error("Failed to telemeter command, endpoints={}, command={}", endpoints, command);
-                }
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                LOGGER.error("Failed to telemeter command to remote, endpoints={}, command={}", endpoints, command, t);
-            }
-        }, MoreExecutors.directExecutor());
+    public void telemetry(Endpoints endpoints, TelemetryCommand command) {
+        final ClientSessionImpl clientSession = getClientSession(endpoints);
+        try {
+            clientSession.fireWrite(command);
+        } catch (Throwable t) {
+            LOGGER.error("Failed to fire write telemetry command, clientId={}, endpoints={}", clientId, endpoints, t);
+        }
     }
 
     private void releaseClientSessions() {
-        endpointsSessionsLock.readLock().lock();
+        sessionsLock.readLock().lock();
         try {
-            endpointsSessionTable.values().forEach(ClientSessionImpl::release);
+            sessionsTable.values().forEach(ClientSessionImpl::release);
         } finally {
-            endpointsSessionsLock.readLock().unlock();
+            sessionsLock.readLock().unlock();
         }
     }
 
-    /**
-     * Try to register telemetry session, return it directly if session is existed already.
-     */
-    public ListenableFuture<ClientSessionImpl> registerTelemetrySession(Endpoints endpoints) {
-        final SettableFuture<ClientSessionImpl> future0 = SettableFuture.create();
-        endpointsSessionsLock.readLock().lock();
+    public ClientSessionImpl getClientSession(Endpoints endpoints) {
+        sessionsLock.readLock().lock();
         try {
-            ClientSessionImpl clientSessionImpl = endpointsSessionTable.get(endpoints);
-            // Return is directly if session is existed already.
-            if (null != clientSessionImpl) {
-                future0.set(clientSessionImpl);
-                return future0;
+            final ClientSessionImpl session = sessionsTable.get(endpoints);
+            if (null != session) {
+                return session;
             }
         } finally {
-            endpointsSessionsLock.readLock().unlock();
+            sessionsLock.readLock().unlock();
         }
-        // Future's exception has been logged during the registration.
-        final ListenableFuture<ClientSessionImpl> future = new ClientSessionImpl(this, endpoints).register();
-        return Futures.transform(future, session -> {
-            endpointsSessionsLock.writeLock().lock();
-            try {
-                ClientSessionImpl existed = endpointsSessionTable.get(endpoints);
-                if (null != existed) {
-                    session.release();
-                    return existed;
-                }
-                endpointsSessionTable.put(endpoints, session);
+        sessionsLock.writeLock().lock();
+        try {
+            ClientSessionImpl session = sessionsTable.get(endpoints);
+            if (null != session) {
                 return session;
-            } finally {
-                endpointsSessionsLock.writeLock().unlock();
             }
-        }, MoreExecutors.directExecutor());
+            session = new ClientSessionImpl(this, endpoints);
+            sessionsTable.put(endpoints, session);
+            return session;
+        } finally {
+            sessionsLock.writeLock().unlock();
+        }
+    }
+
+    public ListenableFuture<Void> syncSettingsSafely(Endpoints endpoints) {
+        final ClientSessionImpl clientSession = getClientSession(endpoints);
+        return clientSession.syncSettingsSafely();
     }
 
     /**
-     * Triggered when {@link TopicRouteDataResult} is fetched from remote.
+     * Triggered when {@link TopicRouteData} is fetched from remote.
      *
      * <p>Never thrown any exception.
      */
-    public ListenableFuture<Void> onTopicRouteDataResultFetched(String topic,
-        TopicRouteDataResult topicRouteDataResult) {
-        final ListenableFuture<List<ClientSessionImpl>> future =
-            Futures.allAsList(topicRouteDataResult.getTopicRouteData()
-                .getMessageQueues().stream()
-                .map(mq -> mq.getBroker().getEndpoints())
-                .collect(Collectors.toSet())
-                .stream().map(this::registerTelemetrySession)
-                .collect(Collectors.toList()));
-        SettableFuture<Void> future0 = SettableFuture.create();
-        Futures.addCallback(future, new FutureCallback<List<ClientSessionImpl>>() {
-            @Override
-            public void onSuccess(List<ClientSessionImpl> sessions) {
-                LOGGER.info("Register session successfully, current route will be cached, topic={}, "
-                    + "topicRouteDataResult={}", topic, topicRouteDataResult);
-                final TopicRouteDataResult old = topicRouteResultCache.put(topic, topicRouteDataResult);
-                if (topicRouteDataResult.equals(old)) {
-                    // Log if topic route result remains the same.
-                    LOGGER.info("Topic route result remains the same, topic={}, route={}, clientId={}", topic, old,
-                        clientId);
-                } else {
-                    // Log if topic route result is updated.
-                    LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
-                        old, topicRouteDataResult);
-                }
-                future0.setFuture(Futures.immediateVoidFuture());
-                onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
-            }
-
-            @Override
-            public void onFailure(Throwable t) {
-                // Note: Topic route would not be updated if failed to register session.
-                LOGGER.error("Failed to register session, current route will NOT be cached, topic={}, "
-                    + "topicRouteDataResult={}", topic, topicRouteDataResult);
-                future0.setException(t);
-            }
-        }, MoreExecutors.directExecutor());
-        return future0;
+    public ListenableFuture<TopicRouteData> onTopicRouteDataFetched(String topic, TopicRouteData topicRouteData) {
+        final List<ListenableFuture<Void>> futures = topicRouteData
+            .getMessageQueues().stream()
+            .map(mq -> mq.getBroker().getEndpoints())
+            .collect(Collectors.toSet())
+            .stream().map(this::syncSettingsSafely)
+            .collect(Collectors.toList());
+        // TODO: Record exception.
+        return Futures.whenAllSucceed(futures).callAsync(() -> {
+            topicRouteCache.put(topic, topicRouteData);
+            onTopicRouteDataUpdate0(topic, topicRouteData);
+            return Futures.immediateFuture(topicRouteData);
+        }, clientCallbackExecutor);
     }
 
-    public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
+    public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
     }
 
     /**
@@ -506,7 +457,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             .setStatus(status)
             .build();
         try {
-            telemeter(endpoints, telemetryCommand);
+            telemetry(endpoints, telemetryCommand);
         } catch (Throwable t) {
             LOGGER.warn("Failed to send message verification result, clientId={}", clientId, t);
         }
@@ -520,20 +471,17 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
      */
     @Override
     public void onRecoverOrphanedTransactionCommand(Endpoints endpoints, RecoverOrphanedTransactionCommand command) {
-        LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, client id={}, "
+        LOGGER.warn("Ignore orphaned transaction recovery command from remote, which is not expected, clientId={}, "
             + "command={}", clientId, command);
     }
 
     private void updateRouteCache() {
         LOGGER.info("Start to update route cache for a new round, clientId={}", clientId);
-        topicRouteResultCache.keySet().forEach(topic -> {
-            // Set timeout for future on purpose.
-            final ListenableFuture<TopicRouteDataResult> future = Futures.withTimeout(fetchTopicRoute(topic),
-                TOPIC_ROUTE_AWAIT_DURATION_DURING_STARTUP, getScheduler());
-            Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() {
+        topicRouteCache.keySet().forEach(topic -> {
+            final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
+            Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
                 @Override
-                public void onSuccess(TopicRouteDataResult topicRouteDataResult) {
-                    onTopicRouteDataResultFetched(topic, topicRouteDataResult);
+                public void onSuccess(TopicRouteData topicRouteData) {
                 }
 
                 @Override
@@ -648,7 +596,25 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
     public void doStats() {
     }
 
-    private ListenableFuture<TopicRouteDataResult> fetchTopicRoute(final String topic) {
+    private ListenableFuture<TopicRouteData> fetchTopicRoute(final String topic) {
+        final ListenableFuture<TopicRouteData> future = Futures.transformAsync(fetchTopicRoute0(topic),
+            topicRouteData -> onTopicRouteDataFetched(topic, topicRouteData), MoreExecutors.directExecutor());
+        Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
+            @Override
+            public void onSuccess(TopicRouteData topicRouteData) {
+                LOGGER.info("Fetch topic route successfully, clientId={}, topic={}, topicRouteData={}", clientId,
+                    topic, topicRouteData);
+            }
+
+            @Override
+            public void onFailure(Throwable t) {
+                LOGGER.error("Failed to fetch topic route, clientId={}, topic={}", clientId, topic, t);
+            }
+        }, MoreExecutors.directExecutor());
+        return future;
+    }
+
+    private ListenableFuture<TopicRouteData> fetchTopicRoute0(final String topic) {
         try {
             Resource topicResource = Resource.newBuilder().setName(topic).build();
             final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
@@ -656,17 +622,37 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
             final Metadata metadata = sign();
             final ListenableFuture<RpcInvocation<QueryRouteResponse>> future =
                 clientManager.queryRoute(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
-            return Futures.transform(future, invocation -> {
+            return Futures.transformAsync(future, invocation -> {
                 final QueryRouteResponse response = invocation.getResponse();
                 final String requestId = invocation.getContext().getRequestId();
                 final Status status = response.getStatus();
+                final String statusMessage = status.getMessage();
                 final Code code = status.getCode();
-                if (Code.OK != code) {
-                    LOGGER.error("Exception raised while fetch topic route from remote, topic={}, " +
-                            "clientId={}, requestId={}, endpoints={}, code={}, status message=[{}]", topic, clientId,
-                        requestId, endpoints, code, status.getMessage());
+                final int codeNumber = code.getNumber();
+                switch (code) {
+                    case OK:
+                        break;
+                    case BAD_REQUEST:
+                    case ILLEGAL_ACCESS_POINT:
+                    case ILLEGAL_TOPIC:
+                    case CLIENT_ID_REQUIRED:
+                        throw new BadRequestException(codeNumber, requestId, statusMessage);
+                    case NOT_FOUND:
+                    case TOPIC_NOT_FOUND:
+                        throw new NotFoundException(codeNumber, requestId, statusMessage);
+                    case TOO_MANY_REQUESTS:
+                        throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
+                    case INTERNAL_ERROR:
+                    case INTERNAL_SERVER_ERROR:
+                        throw new InternalErrorException(codeNumber, requestId, statusMessage);
+                    case PROXY_TIMEOUT:
+                        throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
+                    default:
+                        throw new UnsupportedException(codeNumber, requestId, statusMessage);
                 }
-                return new TopicRouteDataResult(invocation);
+                final List<MessageQueue> messageQueuesList = response.getMessageQueuesList();
+                final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList);
+                return Futures.immediateFuture(topicRouteData);
             }, MoreExecutors.directExecutor());
         } catch (Throwable t) {
             return Futures.immediateFailedFuture(t);
@@ -675,29 +661,29 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
 
     protected Set<Endpoints> getTotalRouteEndpoints() {
         Set<Endpoints> totalRouteEndpoints = new HashSet<>();
-        for (TopicRouteDataResult result : topicRouteResultCache.values()) {
-            totalRouteEndpoints.addAll(result.getTopicRouteData().getTotalEndpoints());
+        for (TopicRouteData topicRouteData : topicRouteCache.values()) {
+            totalRouteEndpoints.addAll(topicRouteData.getTotalEndpoints());
         }
         return totalRouteEndpoints;
     }
 
-    protected ListenableFuture<TopicRouteDataResult> getRouteDataResult(final String topic) {
-        SettableFuture<TopicRouteDataResult> future0 = SettableFuture.create();
-        TopicRouteDataResult topicRouteDataResult = topicRouteResultCache.get(topic);
+    protected ListenableFuture<TopicRouteData> getRouteData(final String topic) {
+        SettableFuture<TopicRouteData> future0 = SettableFuture.create();
+        TopicRouteData topicRouteData = topicRouteCache.get(topic);
         // If route result was cached before, get it directly.
-        if (null != topicRouteDataResult) {
-            future0.set(topicRouteDataResult);
+        if (null != topicRouteData) {
+            future0.set(topicRouteData);
             return future0;
         }
         inflightRouteFutureLock.lock();
         try {
             // If route was fetched by last in-flight request, get it directly.
-            topicRouteDataResult = topicRouteResultCache.get(topic);
-            if (null != topicRouteDataResult) {
-                future0.set(topicRouteDataResult);
+            topicRouteData = topicRouteCache.get(topic);
+            if (null != topicRouteData) {
+                future0.set(topicRouteData);
                 return future0;
             }
-            Set<SettableFuture<TopicRouteDataResult>> inflightFutures = inflightRouteFutureTable.get(topic);
+            Set<SettableFuture<TopicRouteData>> inflightFutures = inflightRouteFutureTable.get(topic);
             // Request is in-flight, return future directly.
             if (null != inflightFutures) {
                 inflightFutures.add(future0);
@@ -709,52 +695,48 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         } finally {
             inflightRouteFutureLock.unlock();
         }
-        final ListenableFuture<TopicRouteDataResult> future = fetchTopicRoute(topic);
-        Futures.addCallback(future, new FutureCallback<TopicRouteDataResult>() {
+        final ListenableFuture<TopicRouteData> future = fetchTopicRoute(topic);
+        Futures.addCallback(future, new FutureCallback<TopicRouteData>() {
             @Override
-            public void onSuccess(TopicRouteDataResult result) {
-                final ListenableFuture<Void> updateFuture = onTopicRouteDataResultFetched(topic, result);
-                // TODO: all succeed?
-                Futures.whenAllSucceed(updateFuture).run(() -> {
-                    inflightRouteFutureLock.lock();
-                    try {
-                        final Set<SettableFuture<TopicRouteDataResult>> newFutureSet =
-                            inflightRouteFutureTable.remove(topic);
-                        if (null == newFutureSet) {
-                            // Should never reach here.
-                            LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
-                                clientId);
-                            return;
-                        }
-                        LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future "
-                            + "size={}, clientId={}", topic, newFutureSet.size(), clientId);
-                        for (SettableFuture<TopicRouteDataResult> newFuture : newFutureSet) {
-                            newFuture.set(result);
-                        }
-                    } catch (Throwable t) {
+            public void onSuccess(TopicRouteData topicRouteData) {
+                inflightRouteFutureLock.lock();
+                try {
+                    final Set<SettableFuture<TopicRouteData>> newFutureSet =
+                        inflightRouteFutureTable.remove(topic);
+                    if (null == newFutureSet) {
                         // Should never reach here.
-                        LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
-                            clientId, t);
-                    } finally {
-                        inflightRouteFutureLock.unlock();
+                        LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic,
+                            clientId);
+                        return;
                     }
-                }, MoreExecutors.directExecutor());
+                    LOGGER.debug("Fetch topic route successfully, topic={}, in-flight route future "
+                        + "size={}, clientId={}", topic, newFutureSet.size(), clientId);
+                    for (SettableFuture<TopicRouteData> newFuture : newFutureSet) {
+                        newFuture.set(topicRouteData);
+                    }
+                } catch (Throwable t) {
+                    // Should never reach here.
+                    LOGGER.error("[Bug] Exception raised while update route data, topic={}, clientId={}", topic,
+                        clientId, t);
+                } finally {
+                    inflightRouteFutureLock.unlock();
+                }
             }
 
             @Override
             public void onFailure(Throwable t) {
                 inflightRouteFutureLock.lock();
                 try {
-                    final Set<SettableFuture<TopicRouteDataResult>> newFutureSet =
+                    final Set<SettableFuture<TopicRouteData>> newFutureSet =
                         inflightRouteFutureTable.remove(topic);
                     if (null == newFutureSet) {
                         // Should never reach here.
                         LOGGER.error("[Bug] in-flight route futures was empty, topic={}, clientId={}", topic, clientId);
                         return;
                     }
-                    LOGGER.error("Failed to fetch topic route, topic={}, in-flight route future " +
+                    LOGGER.debug("Failed to fetch topic route, topic={}, in-flight route future " +
                         "size={}, clientId={}", topic, newFutureSet.size(), clientId, t);
-                    for (SettableFuture<TopicRouteDataResult> future : newFutureSet) {
+                    for (SettableFuture<TopicRouteData> future : newFutureSet) {
                         future.setException(t);
                     }
                 } finally {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
index d6a19c2..90402eb 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientSessionImpl.java
@@ -22,15 +22,11 @@ import apache.rocketmq.v2.RecoverOrphanedTransactionCommand;
 import apache.rocketmq.v2.Settings;
 import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
-import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import io.grpc.stub.StreamObserver;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.impl.producer.ClientSessionProcessor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.client.java.impl.producer.ClientSessionHandler;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,144 +34,125 @@ import org.slf4j.LoggerFactory;
 /**
  * Telemetry session is constructed before first communication between client and remote route endpoints.
  */
-@SuppressWarnings({"UnstableApiUsage", "NullableProblems"})
 public class ClientSessionImpl implements StreamObserver<TelemetryCommand> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ClientSessionImpl.class);
 
-    private final ClientSessionProcessor processor;
+    private final ClientSessionHandler handler;
     private final Endpoints endpoints;
-    private final ReadWriteLock observerLock;
-    private StreamObserver<TelemetryCommand> requestObserver = null;
+    private volatile StreamObserver<TelemetryCommand> requestObserver;
 
-    protected ClientSessionImpl(ClientSessionProcessor processor, Endpoints endpoints) {
-        this.processor = processor;
+    protected ClientSessionImpl(ClientSessionHandler handler, Endpoints endpoints) {
+        this.handler = handler;
         this.endpoints = endpoints;
-        this.observerLock = new ReentrantReadWriteLock();
+        renewRequestObserver();
     }
 
-    protected ListenableFuture<ClientSessionImpl> register() {
-        ListenableFuture<ClientSessionImpl> future;
+    private void renewRequestObserver() {
         try {
-            final TelemetryCommand command = processor.getSettingsCommand();
-            this.publish(command);
-            future = Futures.transform(processor.register(), input -> this, MoreExecutors.directExecutor());
+            if (handler.isEndpointsDeprecated(endpoints)) {
+                LOGGER.info("Endpoints is deprecated, no longer to renew requestObserver, endpoints={}", endpoints);
+                return;
+            }
+            this.requestObserver = handler.telemetry(endpoints, this);
         } catch (Throwable t) {
-            future = Futures.immediateFailedFuture(t);
+            handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS);
         }
-        final String clientId = processor.clientId();
-        Futures.addCallback(future, new FutureCallback<ClientSessionImpl>() {
-            @Override
-            public void onSuccess(ClientSessionImpl session) {
-                LOGGER.info("Register client session successfully, endpoints={}, clientId={}", endpoints, clientId);
-            }
+    }
 
-            @Override
-            public void onFailure(Throwable t) {
-                LOGGER.error("Failed to register client session, endpoints={}, clientId={}", endpoints, clientId, t);
-                release();
-            }
-        }, MoreExecutors.directExecutor());
-        return future;
+    protected ListenableFuture<Void> syncSettingsSafely() {
+        try {
+            final TelemetryCommand settings = handler.settingsCommand();
+            fireWrite(settings);
+            return handler.awaitSettingSynchronized();
+        } catch (Throwable t) {
+            return Futures.immediateFailedFuture(t);
+        }
     }
 
     /**
      * Release telemetry session.
      */
     public void release() {
-        this.observerLock.writeLock().lock();
+        if (null == requestObserver) {
+            return;
+        }
         try {
-            if (null != requestObserver) {
-                try {
-                    requestObserver.onCompleted();
-                } catch (Throwable ignore) {
-                    // Ignore exception on purpose.
-                }
-                requestObserver = null;
-            }
-        } finally {
-            this.observerLock.writeLock().unlock();
+            requestObserver.onCompleted();
+        } catch (Throwable ignore) {
+            // Ignore exception on purpose.
         }
     }
 
-    /**
-     * Telemeter command to remote.
-     *
-     * @param command appointed command to telemeter
-     */
-    public void publish(TelemetryCommand command) throws ClientException {
-        this.observerLock.readLock().lock();
-        try {
-            if (null != requestObserver) {
-                requestObserver.onNext(command);
-                return;
-            }
-        } finally {
-            this.observerLock.readLock().unlock();
-        }
-        this.observerLock.writeLock().lock();
-        try {
-            if (null == requestObserver) {
-                this.requestObserver = processor.telemetry(endpoints, this);
-            }
-            requestObserver.onNext(command);
-        } finally {
-            this.observerLock.writeLock().unlock();
+    public void fireWrite(TelemetryCommand command) {
+        if (null == requestObserver) {
+            LOGGER.error("Request observer does not exist, ignore current command, endpoints={}, command={}",
+                endpoints, command);
+            return;
         }
+        requestObserver.onNext(command);
     }
 
     @Override
     public void onNext(TelemetryCommand command) {
+        final String clientId = handler.clientId();
         try {
             switch (command.getCommandCase()) {
                 case SETTINGS: {
                     final Settings settings = command.getSettings();
-                    LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints,
-                        processor.clientId());
-                    processor.onSettingsCommand(endpoints, settings);
+                    LOGGER.info("Receive settings from remote, endpoints={}, clientId={}", endpoints, clientId);
+                    handler.onSettingsCommand(endpoints, settings);
                     break;
                 }
                 case RECOVER_ORPHANED_TRANSACTION_COMMAND: {
                     final RecoverOrphanedTransactionCommand recoverOrphanedTransactionCommand =
                         command.getRecoverOrphanedTransactionCommand();
                     LOGGER.info("Receive orphaned transaction recovery command from remote, endpoints={}, "
-                        + "clientId={}", endpoints, processor.clientId());
-                    processor.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
+                        + "clientId={}", endpoints, clientId);
+                    handler.onRecoverOrphanedTransactionCommand(endpoints, recoverOrphanedTransactionCommand);
                     break;
                 }
                 case VERIFY_MESSAGE_COMMAND: {
                     final VerifyMessageCommand verifyMessageCommand = command.getVerifyMessageCommand();
                     LOGGER.info("Receive message verification command from remote, endpoints={}, clientId={}",
-                        endpoints, processor.clientId());
-                    processor.onVerifyMessageCommand(endpoints, verifyMessageCommand);
+                        endpoints, clientId);
+                    handler.onVerifyMessageCommand(endpoints, verifyMessageCommand);
                     break;
                 }
                 case PRINT_THREAD_STACK_TRACE_COMMAND: {
                     final PrintThreadStackTraceCommand printThreadStackTraceCommand =
                         command.getPrintThreadStackTraceCommand();
                     LOGGER.info("Receive thread stack print command from remote, endpoints={}, clientId={}",
-                        endpoints, processor.clientId());
-                    processor.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand);
+                        endpoints, clientId);
+                    handler.onPrintThreadStackTraceCommand(endpoints, printThreadStackTraceCommand);
                     break;
                 }
                 default:
                     LOGGER.warn("Receive unrecognized command from remote, endpoints={}, command={}, clientId={}",
-                        endpoints, command, processor.clientId());
+                        endpoints, command, clientId);
             }
         } catch (Throwable t) {
             LOGGER.error("[Bug] unexpected exception raised while receiving command from remote, command={}, "
-                + "clientId={}", command, processor.clientId(), t);
+                + "clientId={}", command, clientId, t);
         }
     }
 
     @Override
     public void onError(Throwable throwable) {
         LOGGER.error("Exception raised from stream response observer, clientId={}, endpoints={}",
-            processor.clientId(), endpoints, throwable);
-        this.release();
+            handler.clientId(), endpoints, throwable);
+        release();
+        if (!handler.isRunning()) {
+            return;
+        }
+        handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS);
     }
 
     @Override
     public void onCompleted() {
-        this.release();
+        release();
+        if (!handler.isRunning()) {
+            return;
+        }
+        handler.getScheduler().schedule(this::renewRequestObserver, 3, TimeUnit.SECONDS);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index d4271bf..965507c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -497,18 +497,18 @@ class ProcessQueueImpl implements ProcessQueue {
                     forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
                     return;
                 }
+                // Set result if message is forwarded successfully.
+                future0.setFuture(Futures.immediateVoidFuture());
                 // Log retries.
                 if (1 < attempt) {
                     LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
                             + "attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
                         attempt, messageId, mq, endpoints, requestId);
-                } else {
-                    LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
-                            + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq,
-                        endpoints, requestId);
+                    return;
                 }
-                // Set result if message is forwarded successfully.
-                future0.setFuture(Futures.immediateVoidFuture());
+                LOGGER.info("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
+                        + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq,
+                    endpoints, requestId);
             }
 
             @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 2c18a92..86bef80 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -77,7 +77,7 @@ import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -234,9 +234,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                 this.state(), clientId);
             throw new IllegalStateException("Push consumer is not running now");
         }
-        final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
-        TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
-        topicRouteDataResult.checkAndGetTopicRouteData();
+        final ListenableFuture<TopicRouteData> future = getRouteData(topic);
+        handleClientFuture(future);
         subscriptionExpressions.put(topic, filterExpression);
         return this;
     }
@@ -257,9 +256,9 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
     }
 
     private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic) {
-        final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
-        return Futures.transformAsync(future, topicRouteDataResult -> {
-            Endpoints endpoints = topicRouteDataResult.checkAndGetTopicRouteData().pickEndpointsToQueryAssignments();
+        final ListenableFuture<TopicRouteData> future = getRouteData(topic);
+        return Futures.transformAsync(future, topicRouteData -> {
+            Endpoints endpoints = topicRouteData.pickEndpointsToQueryAssignments();
             return Futures.immediateFuture(endpoints);
         }, MoreExecutors.directExecutor());
     }
@@ -513,7 +512,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                     .setStatus(status)
                     .build();
                 try {
-                    telemeter(endpoints, command);
+                    telemetry(endpoints, command);
                 } catch (Throwable t) {
                     LOGGER.error("Failed to send message verification result command, endpoints={}, command={}, "
                         + "messageId={}, clientId={}", endpoints, command, messageId, clientId, t);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index d90edf8..d8ce5d8 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -55,7 +55,7 @@ import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -130,9 +130,8 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
                 this.state(), clientId);
             throw new IllegalStateException("Simple consumer is not running now");
         }
-        final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
-        TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
-        topicRouteDataResult.checkAndGetTopicRouteData();
+        final ListenableFuture<TopicRouteData> future = getRouteData(topic);
+        handleClientFuture(future);
         subscriptionExpressions.put(topic, filterExpression);
         return this;
     }
@@ -359,9 +358,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         return simpleConsumerSettings;
     }
 
-    public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
+    public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
         final SubscriptionLoadBalancer subscriptionLoadBalancer =
-            new SubscriptionLoadBalancer(topicRouteDataResult);
+            new SubscriptionLoadBalancer(topicRouteData);
         subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
     }
 
@@ -372,7 +371,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
             future0.set(result);
             return future0;
         }
-        final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
+        final ListenableFuture<TopicRouteData> future = getRouteData(topic);
         return Futures.transform(future, topicRouteDataResult -> {
             final SubscriptionLoadBalancer subscriptionLoadBalancer =
                 new SubscriptionLoadBalancer(topicRouteDataResult);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index 21610b2..012d441 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -17,20 +17,20 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.math.IntMath;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import javax.annotation.concurrent.Immutable;
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
 
 @Immutable
 public class SubscriptionLoadBalancer {
-    private final TopicRouteDataResult topicRouteDataResult;
     /**
      * Index for round-robin.
      */
@@ -40,30 +40,19 @@ public class SubscriptionLoadBalancer {
      */
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
-    public SubscriptionLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
-        this.topicRouteDataResult = topicRouteDataResult;
+    public SubscriptionLoadBalancer(TopicRouteData topicRouteData) {
         this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
-        final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
-        if (!topicRouteDataResult.ok()) {
-            this.messageQueues = builder.build();
-            return;
+        final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
+            .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isReadable() &&
+                Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
+            .collect(Collectors.toList());
+        if (mqs.isEmpty()) {
+            throw new IllegalArgumentException("No readable message queue found, topiRouteData=" + topicRouteData);
         }
-        for (MessageQueueImpl messageQueue : topicRouteDataResult.getTopicRouteData().getMessageQueues()) {
-            if (!messageQueue.getPermission().isReadable() ||
-                Utilities.MASTER_BROKER_ID != messageQueue.getBroker().getId()) {
-                continue;
-            }
-            builder.add(messageQueue);
-        }
-        this.messageQueues = builder.build();
+        this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
     }
 
-    public MessageQueueImpl takeMessageQueue() throws ClientException {
-        topicRouteDataResult.checkAndGetTopicRouteData();
-        if (messageQueues.isEmpty()) {
-            // Should never reach here.
-            throw new NotFoundException("Failed to take message queue due to readable message queue doesn't exist");
-        }
+    public MessageQueueImpl takeMessageQueue() {
         final int next = index.getAndIncrement();
         return messageQueues.get(IntMath.mod(next, messageQueues.size()));
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
similarity index 83%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
index 8bdc019..817e24f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionProcessor.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ClientSessionHandler.java
@@ -24,15 +24,23 @@ import apache.rocketmq.v2.TelemetryCommand;
 import apache.rocketmq.v2.VerifyMessageCommand;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
-public interface ClientSessionProcessor {
-    ListenableFuture<Void> register();
+public interface ClientSessionHandler {
+    @SuppressWarnings("BooleanMethodIsAlwaysInverted")
+    boolean isRunning();
+
+    ScheduledExecutorService getScheduler();
+
+    boolean isEndpointsDeprecated(Endpoints endpoints);
+
+    ListenableFuture<Void> awaitSettingSynchronized();
 
     String clientId();
 
-    TelemetryCommand getSettingsCommand();
+    TelemetryCommand settingsCommand();
 
     StreamObserver<TelemetryCommand> telemetry(Endpoints endpoints, StreamObserver<TelemetryCommand> observer)
         throws ClientException;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index bb4ae49..08483f0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -72,7 +72,7 @@ import org.apache.rocketmq.client.java.retry.ExponentialBackoffRetryPolicy;
 import org.apache.rocketmq.client.java.retry.RetryPolicy;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
 import org.apache.rocketmq.client.java.rpc.RpcInvocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -89,7 +89,7 @@ class ProducerImpl extends ClientImpl implements Producer {
     protected final ProducerSettings producerSettings;
 
     private final TransactionChecker checker;
-    private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataResultCache;
+    private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataCache;
 
     /**
      * The caller is supposed to have validated the arguments and handled throwing exception or
@@ -102,7 +102,7 @@ class ProducerImpl extends ClientImpl implements Producer {
         this.producerSettings = new ProducerSettings(clientId, endpoints, retryPolicy,
             clientConfiguration.getRequestTimeout(), topics);
         this.checker = checker;
-        this.publishingRouteDataResultCache = new ConcurrentHashMap<>();
+        this.publishingRouteDataCache = new ConcurrentHashMap<>();
     }
 
     @Override
@@ -540,26 +540,21 @@ class ProducerImpl extends ClientImpl implements Producer {
     }
 
     @Override
-    public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
+    public void onTopicRouteDataUpdate0(String topic, TopicRouteData topicRouteData) {
         final PublishingLoadBalancer publishingLoadBalancer =
-            new PublishingLoadBalancer(topicRouteDataResult);
-        publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
+            new PublishingLoadBalancer(topicRouteData);
+        publishingRouteDataCache.put(topic, publishingLoadBalancer);
     }
 
     private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) {
-        SettableFuture<PublishingLoadBalancer> future0 = SettableFuture.create();
-        final PublishingLoadBalancer result = publishingRouteDataResultCache.get(topic);
+        final PublishingLoadBalancer result = publishingRouteDataCache.get(topic);
         if (null != result) {
-            future0.set(result);
-            return future0;
+            return Futures.immediateFuture(result);
         }
-        return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> {
-            SettableFuture<PublishingLoadBalancer> future = SettableFuture.create();
-            final PublishingLoadBalancer publishingLoadBalancer =
-                new PublishingLoadBalancer(topicRouteDataResult);
-            publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
-            future.set(publishingLoadBalancer);
-            return future;
+        return Futures.transformAsync(getRouteData(topic), topicRouteDataResult -> {
+            final PublishingLoadBalancer loadBalancer = new PublishingLoadBalancer(topicRouteDataResult);
+            publishingRouteDataCache.put(topic, loadBalancer);
+            return Futures.immediateFuture(loadBalancer);
         }, MoreExecutors.directExecutor());
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index 9b326de..feb9616 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.client.java.impl.producer;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Hashing;
 import com.google.common.math.IntMath;
@@ -28,19 +29,17 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import javax.annotation.concurrent.Immutable;
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Broker;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
-import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
+import org.apache.rocketmq.client.java.route.TopicRouteData;
 
 @Immutable
 public class PublishingLoadBalancer {
-    private final TopicRouteDataResult topicRouteDataResult;
     /**
      * Index for round-robin.
      */
@@ -50,40 +49,25 @@ public class PublishingLoadBalancer {
      */
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
-    public PublishingLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
-        this.topicRouteDataResult = topicRouteDataResult;
+    public PublishingLoadBalancer(TopicRouteData topicRouteData) {
         this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
-        final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
-        if (!topicRouteDataResult.ok()) {
-            this.messageQueues = builder.build();
-            return;
-        }
-        for (MessageQueueImpl messageQueue : topicRouteDataResult.getTopicRouteData().getMessageQueues()) {
-            if (!messageQueue.getPermission().isWritable() ||
-                Utilities.MASTER_BROKER_ID != messageQueue.getBroker().getId()) {
-                continue;
-            }
-            builder.add(messageQueue);
-        }
-        this.messageQueues = builder.build();
-    }
-
-    private void preconditionCheckBeforeTakingMessageQueue() throws ClientException {
-        topicRouteDataResult.checkAndGetTopicRouteData();
-        if (messageQueues.isEmpty()) {
-            throw new NotFoundException("Failed to take message due to writable message queue doesn't exist");
+        final List<MessageQueueImpl> mqs = topicRouteData.getMessageQueues().stream()
+            .filter((Predicate<MessageQueueImpl>) mq -> mq.getPermission().isWritable() &&
+                Utilities.MASTER_BROKER_ID == mq.getBroker().getId())
+            .collect(Collectors.toList());
+        if (mqs.isEmpty()) {
+            throw new IllegalArgumentException("No writable message queue found, topiRouteData=" + topicRouteData);
         }
+        this.messageQueues = ImmutableList.<MessageQueueImpl>builder().addAll(mqs).build();
     }
 
-    public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) throws ClientException {
-        preconditionCheckBeforeTakingMessageQueue();
+    public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) {
         final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong();
         final int index = LongMath.mod(hashCode, messageQueues.size());
         return messageQueues.get(index);
     }
 
-    public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) throws ClientException {
-        preconditionCheckBeforeTakingMessageQueue();
+    public List<MessageQueueImpl> takeMessageQueues(Set<Endpoints> excluded, int count) {
         int next = index.getAndIncrement();
         List<MessageQueueImpl> candidates = new ArrayList<>();
         Set<String> candidateBrokerNames = new HashSet<>();
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
deleted file mode 100644
index 94adee4..0000000
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.client.java.route;
-
-import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.MessageQueue;
-import apache.rocketmq.v2.QueryRouteResponse;
-import apache.rocketmq.v2.Status;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import java.util.List;
-import javax.annotation.concurrent.Immutable;
-import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.BadRequestException;
-import org.apache.rocketmq.client.java.exception.InternalErrorException;
-import org.apache.rocketmq.client.java.exception.NotFoundException;
-import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
-import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
-import org.apache.rocketmq.client.java.exception.UnsupportedException;
-import org.apache.rocketmq.client.java.rpc.RpcInvocation;
-
-/**
- * Result topic route data fetched from remote.
- */
-@Immutable
-public class TopicRouteDataResult {
-    private final TopicRouteData topicRouteData;
-    private final ClientException exception;
-
-    public TopicRouteDataResult(RpcInvocation<QueryRouteResponse> invocation) {
-        final QueryRouteResponse response = invocation.getResponse();
-        final String requestId = invocation.getContext().getRequestId();
-        final List<MessageQueue> messageQueuesList = response.getMessageQueuesList();
-        final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList);
-        final Status status = response.getStatus();
-        this.topicRouteData = topicRouteData;
-        final Code code = status.getCode();
-        final int codeNumber = code.getNumber();
-        final String statusMessage = status.getMessage();
-        switch (code) {
-            case OK:
-                this.exception = null;
-                break;
-            case BAD_REQUEST:
-            case ILLEGAL_ACCESS_POINT:
-            case ILLEGAL_TOPIC:
-            case CLIENT_ID_REQUIRED:
-                this.exception = new BadRequestException(codeNumber, requestId, statusMessage);
-                break;
-            case NOT_FOUND:
-            case TOPIC_NOT_FOUND:
-                this.exception = new NotFoundException(codeNumber, requestId, statusMessage);
-                break;
-            case TOO_MANY_REQUESTS:
-                this.exception = new TooManyRequestsException(codeNumber, requestId, statusMessage);
-                break;
-            case INTERNAL_ERROR:
-            case INTERNAL_SERVER_ERROR:
-                this.exception = new InternalErrorException(codeNumber, requestId, statusMessage);
-                break;
-            case PROXY_TIMEOUT:
-                this.exception = new ProxyTimeoutException(codeNumber, requestId, statusMessage);
-                break;
-            default:
-                this.exception = new UnsupportedException(codeNumber, requestId, statusMessage);
-        }
-    }
-
-    public TopicRouteData getTopicRouteData() {
-        return topicRouteData;
-    }
-
-    public TopicRouteData checkAndGetTopicRouteData() throws ClientException {
-        if (null != exception) {
-            throw exception;
-        }
-        return topicRouteData;
-    }
-
-    public boolean ok() {
-        return null == exception;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        TopicRouteDataResult result = (TopicRouteDataResult) o;
-        return Objects.equal(topicRouteData, result.topicRouteData) && Objects.equal(exception, result.exception);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(topicRouteData, exception);
-    }
-
-    @Override
-    public String toString() {
-        final MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this)
-            .add("topicRouteData", this.topicRouteData);
-        if (null == exception) {
-            return helper.toString();
-        }
-        return helper.add("exception", this.exception).toString();
-    }
-}