You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/01 09:12:04 UTC

[rocketmq-clients] 01/02: Java: implement error handling

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit ff6dfffc86e2f6fb5ce9d9ecbb274beebd7b0ba6
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 1 11:30:18 2022 +0800

    Java: implement error handling
---
 .../rocketmq/client/apis/ClientException.java      |   4 +-
 ...oundException.java => BadRequestException.java} |  13 ++-
 ...FoundException.java => ForbiddenException.java} |  13 ++-
 ...dException.java => InternalErrorException.java} |  16 +--
 ...tFoundException.java => NotFoundException.java} |  13 ++-
 ...xception.java => PayloadTooLargeException.java} |  13 ++-
 ...ndException.java => ProxyTimeoutException.java} |  14 +--
 ...a => RequestHeaderFieldsTooLargeException.java} |  16 +--
 ...xception.java => TooManyRequestsException.java} |  15 +--
 ...ndException.java => UnauthorizedException.java} |  14 +--
 ...undException.java => UnsupportedException.java} |  13 ++-
 .../rocketmq/client/java/impl/ClientImpl.java      |  19 +---
 .../java/impl/consumer/ProcessQueueImpl.java       |  46 ++-------
 .../java/impl/consumer/PushConsumerImpl.java       |  22 +---
 .../java/impl/consumer/ReceiveMessageResult.java   |  76 ++++++++++++--
 .../java/impl/consumer/SimpleConsumerImpl.java     | 114 +++++++++++++--------
 ...taResult.java => SubscriptionLoadBalancer.java} |  36 +++----
 .../client/java/impl/producer/ProducerImpl.java    |  34 +++---
 ...DataResult.java => PublishingLoadBalancer.java} |  28 +++--
 .../client/java/impl/producer/SendReceiptImpl.java |  80 +++++++++++----
 .../rocketmq/client/java/route/TopicRouteData.java |   6 +-
 .../client/java/route/TopicRouteDataResult.java    |  65 ++++++++++--
 .../apache/rocketmq/client/java/tool/TestBase.java |   4 +-
 23 files changed, 401 insertions(+), 273 deletions(-)

diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
index 9a0ced1..442f20f 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
@@ -80,9 +80,9 @@ public class ClientException extends Exception {
         return null == requestId ? Optional.empty() : Optional.of(requestId);
     }
 
-    public Optional<String> getResponseCode() {
+    public Optional<Integer> getResponseCode() {
         final String responseCode = context.get(RESPONSE_CODE_KEY);
-        return null == responseCode ? Optional.empty() : Optional.of(responseCode);
+        return null == responseCode ? Optional.empty() : Optional.of(Integer.parseInt(responseCode));
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
similarity index 77%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
index 217c3ef..386c916 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception for bad request, indicating that the required fields of headers are missing.
+ */
+public class BadRequestException extends ClientException {
+    public BadRequestException(int responseCode, String message) {
+        super(responseCode, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
similarity index 77%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
index 217c3ef..49eb280 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception for the case that user does not have permission to access/operation the resource.
+ */
+public class ForbiddenException extends ClientException {
+    public ForbiddenException(int responseCode, String message) {
+        super(responseCode, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
similarity index 74%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index 217c3ef..e1a44a4 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception indicates that the server/client encountered an unexpected condition that prevented it from
+ * fulfilling the request.
+ */
+public class InternalErrorException extends ClientException {
+    public InternalErrorException(int responseCode, String message) {
+        super(responseCode, message);
     }
-}
+}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
similarity index 75%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
index 217c3ef..6ca06ca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
@@ -19,12 +19,19 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
+/**
+ * Generic exception for resource not found.
+ */
+public class NotFoundException extends ClientException {
+    public NotFoundException(int responseCode, String message) {
+        super(responseCode, message);
+    }
+
+    public NotFoundException(String message) {
         super(message);
     }
 
-    public ResourceNotFoundException(Throwable t) {
+    public NotFoundException(Throwable t) {
         super(t);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
similarity index 76%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
index 217c3ef..6dd8358 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception represents that the request entity is larger than the limits defined by the server.
+ */
+public class PayloadTooLargeException extends ClientException {
+    public PayloadTooLargeException(int responseCode, String message) {
+        super(responseCode, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
similarity index 73%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
index 217c3ef..2a8abbf 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception represents that the server, which acts as a gateway or proxy, does not get a satisfied response
+ * in time from its upstream servers.
+ */
+public class ProxyTimeoutException extends ClientException {
+    public ProxyTimeoutException(int responseCode, String message) {
+        super(responseCode, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
similarity index 68%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
index 217c3ef..e2813f1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception for the case that the server is unwilling to process the request because its header fields are
+ * too large. The request may be resubmitted after reducing the size of the request header fields.
+ */
+public class RequestHeaderFieldsTooLargeException extends ClientException {
+    public RequestHeaderFieldsTooLargeException(int responseCode, String message) {
+        super(responseCode, message);
     }
-}
+}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
similarity index 74%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
index 217c3ef..b4fd82f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception indicates that too many requests are made in short period of duration.
+ *
+ * <p>Requests are throttled.
+ */
+public class TooManyRequestsException extends ClientException {
+    public TooManyRequestsException(int responseCode, String message) {
+        super(responseCode, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
similarity index 75%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
index 217c3ef..361170e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception indicates that the client request lacks valid authentication credentials for the requested
+ * resource.
+ */
+public class UnauthorizedException extends ClientException {
+    public UnauthorizedException(int responseCode, String message) {
+        super(responseCode, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
similarity index 80%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
index 217c3ef..8fe578c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
 
 import org.apache.rocketmq.client.apis.ClientException;
 
-public class ResourceNotFoundException extends ClientException {
-    public ResourceNotFoundException(String message) {
-        super(message);
-    }
-
-    public ResourceNotFoundException(Throwable t) {
-        super(t);
+/**
+ * Generic exception for unsupported exception.
+ */
+public class UnsupportedException extends ClientException {
+    public UnsupportedException(int responseCode, String message) {
+        super(responseCode, message);
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index cfa6db2..b16221f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -51,7 +51,6 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -67,10 +66,9 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.hook.MessageInterceptor;
@@ -187,17 +185,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
         } catch (Throwable t) {
             LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, "
                 + "topics={}", clientId, topics, t);
-            throw new ResourceNotFoundException(t);
+            throw new NotFoundException(t);
         }
-        // Find any topic whose topic route data is failed to fetch from remote.
-        final Stream<TopicRouteDataResult> stream = results.stream()
-            .filter(topicRouteDataResult -> Code.OK != topicRouteDataResult.getStatus().getCode());
-        final Optional<TopicRouteDataResult> any = stream.findAny();
-        // There is a topic whose topic route data is failed to fetch from remote.
-        if (any.isPresent()) {
-            final TopicRouteDataResult result = any.get();
-            final Status status = result.getStatus();
-            throw new ClientException(status.getCode().getNumber(), status.getMessage());
+        for (TopicRouteDataResult result : results) {
+            result.checkAndGetTopicRouteData();
         }
         LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
             clientId, topics);
@@ -444,8 +435,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
                     LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
                         old, topicRouteDataResult);
                 }
-                onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
                 future0.set(null);
+                onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
             }
 
             @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index ef7cf91..dc648f6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -238,7 +238,7 @@ class ProcessQueueImpl implements ProcessQueue {
                     final Duration duration = stopwatch.elapsed();
                     final List<MessageCommon> commons = result.getMessages().stream()
                         .map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
-                    if (result.getStatus().isPresent() && Code.OK.equals(result.getStatus().get().getCode())) {
+                    if (result.ok()) {
                         consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
                     } else {
                         consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.ERROR);
@@ -320,45 +320,17 @@ class ProcessQueueImpl implements ProcessQueue {
     }
 
     private void onReceiveMessageResult(ReceiveMessageResult result) {
-        Optional<Status> optionalStatus = result.getStatus();
         final List<MessageViewImpl> messages = result.getMessages();
-        final Endpoints endpoints = result.getEndpoints();
-        if (!optionalStatus.isPresent()) {
-            // Should never reach here.
-            LOGGER.error("[Bug] Status in receive message result is not set, mq={}, endpoints={}", mq, endpoints);
-            if (messages.isEmpty()) {
-                receiveMessage();
-            }
-            optionalStatus = Optional.of(Status.newBuilder().setCode(Code.OK).build());
-            LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK," +
-                " mq={}, endpoints={}", mq, endpoints);
+        if (!result.ok()) {
+            receiveMessageLater();
+            return;
         }
-        final Status status = optionalStatus.get();
-        final Code code = status.getCode();
-        switch (code) {
-            case OK:
-                if (!messages.isEmpty()) {
-                    cacheMessages(messages);
-                    consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
-                    consumer.getConsumeService().signal();
-                }
-                LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}",
-                    mq, endpoints, messages.size(), consumer.getClientId());
-                receiveMessage();
-                break;
-            case MESSAGE_NOT_FOUND:
-                LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}," +
-                        " status message=[{}]",
-                    mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
-                receiveMessage();
-                break;
-            // Fall through on purpose.
-            default:
-                LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}," +
-                        " code={}, status message=[{}]",
-                    mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
-                receiveMessageLater();
+        if (!messages.isEmpty()) {
+            cacheMessages(messages);
+            consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
+            consumer.getConsumeService().signal();
         }
+        receiveMessage();
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index b8cd546..94d974f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -225,16 +225,11 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
                 this.state(), clientId);
             throw new IllegalStateException("Push consumer is not running now");
         }
-
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
         TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
-        final Status status = topicRouteDataResult.getStatus();
-        final Code code = status.getCode();
-        if (Code.OK.equals(code)) {
-            subscriptionExpressions.put(topic, filterExpression);
-            return this;
-        }
-        throw new ClientException(code.getNumber(), status.getMessage());
+        topicRouteDataResult.checkAndGetTopicRouteData();
+        subscriptionExpressions.put(topic, filterExpression);
+        return this;
     }
 
     /**
@@ -255,15 +250,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
     private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic) {
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
         return Futures.transformAsync(future, topicRouteDataResult -> {
-            final SettableFuture<Endpoints> future0 = SettableFuture.create();
-            final Status status = topicRouteDataResult.getStatus();
-            final Code code = status.getCode();
-            if (!Code.OK.equals(code)) {
-                throw new ClientException(code.getNumber(), status.getMessage());
-            }
-            Endpoints endpoints = topicRouteDataResult.getTopicRouteData().pickEndpointsToQueryAssignments();
-            future0.set(endpoints);
-            return future0;
+            Endpoints endpoints = topicRouteDataResult.checkAndGetTopicRouteData().pickEndpointsToQueryAssignments();
+            return Futures.immediateFuture(endpoints);
         }, MoreExecutors.directExecutor());
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 52f501e..09b8d12 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -17,30 +17,92 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
+import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.Status;
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnauthorizedException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.route.Endpoints;
 
 public class ReceiveMessageResult {
     private final Endpoints endpoints;
-    private final Status status;
+    private final ClientException exception;
 
     private final List<MessageViewImpl> messages;
 
     public ReceiveMessageResult(Endpoints endpoints, Status status, List<MessageViewImpl> messages) {
         this.endpoints = endpoints;
-        this.status = status;
+        final Code code = status.getCode();
+        switch (code) {
+            case OK:
+                this.exception = null;
+                break;
+            case BAD_REQUEST:
+            case ILLEGAL_TOPIC:
+            case ILLEGAL_CONSUMER_GROUP:
+            case ILLEGAL_FILTER_EXPRESSION:
+            case CLIENT_ID_REQUIRED:
+                this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+                break;
+            case UNAUTHORIZED:
+                this.exception = new UnauthorizedException(code.getNumber(), status.getMessage());
+                break;
+            case FORBIDDEN:
+                this.exception = new ForbiddenException(code.getNumber(), status.getMessage());
+                break;
+            case MESSAGE_NOT_FOUND:
+            case NOT_FOUND:
+            case TOPIC_NOT_FOUND:
+            case CONSUMER_GROUP_NOT_FOUND:
+                this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+                break;
+            case TOO_MANY_REQUESTS:
+                this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+                break;
+            case INTERNAL_ERROR:
+            case INTERNAL_SERVER_ERROR:
+                this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+                break;
+            case PROXY_TIMEOUT:
+                this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                break;
+            default:
+                this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+        }
         this.messages = messages;
     }
 
-    public Endpoints getEndpoints() {
-        return endpoints;
+    /**
+     * Indicates that the result is ok or not.
+     *
+     * <p>The result is ok if the status code is {@link Code#OK} or {@link Code#MESSAGE_NOT_FOUND}.
+     *
+     * @return true if the result is ok, false otherwise.
+     */
+    public boolean ok() {
+        return null == exception || (exception.getResponseCode().isPresent() && Code.OK.getNumber() ==
+            exception.getResponseCode().get());
+    }
+
+    public List<MessageView> checkAndGetMessages() throws ClientException {
+        if (null != exception) {
+            throw exception;
+        }
+        return new ArrayList<>(messages);
     }
 
-    public Optional<Status> getStatus() {
-        return null == status ? Optional.empty() : Optional.of(status);
+    public Endpoints getEndpoints() {
+        return endpoints;
     }
 
     public List<MessageViewImpl> getMessages() {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index d7b0b1c..b0ccfd9 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -32,7 +32,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -44,6 +43,14 @@ import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
 import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnauthorizedException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.impl.ClientSettings;
 import org.apache.rocketmq.client.java.message.MessageViewImpl;
 import org.apache.rocketmq.client.java.message.protocol.Resource;
@@ -66,7 +73,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     private final AtomicInteger topicIndex;
 
     private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
-    private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subTopicRouteDataResultCache;
+    private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subTopicRouteDataResultCache;
 
     public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
         Map<String, FilterExpression> subscriptionExpressions) {
@@ -124,13 +131,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         }
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
         TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
-        final Status status = topicRouteDataResult.getStatus();
-        final Code code = status.getCode();
-        if (Code.OK.equals(code)) {
-            subscriptionExpressions.put(topic, filterExpression);
-            return this;
-        }
-        throw new ClientException(code.getNumber(), status.getMessage());
+        topicRouteDataResult.checkAndGetTopicRouteData();
+        subscriptionExpressions.put(topic, filterExpression);
+        return this;
     }
 
     /**
@@ -192,27 +195,15 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         }
         final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
         final FilterExpression filterExpression = copy.get(topic);
-        final ListenableFuture<SubscriptionTopicRouteDataResult> routeFuture = getSubscriptionTopicRouteResult(topic);
+        final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionTopicRouteResult(topic);
         final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
             final MessageQueueImpl mq = result.takeMessageQueue();
             final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
                 invisibleDuration);
             return receiveMessage(request, mq, awaitDuration);
         }, MoreExecutors.directExecutor());
-        return Futures.transformAsync(future0, result -> {
-            final Optional<Status> optionalStatus = result.getStatus();
-            if (!optionalStatus.isPresent()) {
-                future.set(new ArrayList<>(result.getMessages()));
-                return future;
-            }
-            final Status status = optionalStatus.get();
-            final Code code = status.getCode();
-            if (Code.OK.equals(code)) {
-                future.set(new ArrayList<>(result.getMessages()));
-                return future;
-            }
-            throw new ClientException(code.getNumber(), status.getMessage());
-        }, clientCallbackExecutor);
+        return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.checkAndGetMessages()),
+            clientCallbackExecutor);
     }
 
     /**
@@ -253,11 +244,32 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
         return Futures.transformAsync(future, response -> {
             final Status status = response.getStatus();
             final Code code = status.getCode();
-            if (Code.OK.equals(code)) {
-                future0.set(null);
-                return future0;
+            switch (code) {
+                case OK:
+                    return Futures.immediateVoidFuture();
+                case BAD_REQUEST:
+                case ILLEGAL_TOPIC:
+                case ILLEGAL_CONSUMER_GROUP:
+                case INVALID_RECEIPT_HANDLE:
+                case CLIENT_ID_REQUIRED:
+                    throw new BadRequestException(code.getNumber(), status.getMessage());
+                case UNAUTHORIZED:
+                    throw new UnauthorizedException(code.getNumber(), status.getMessage());
+                case FORBIDDEN:
+                    throw new ForbiddenException(code.getNumber(), status.getMessage());
+                case NOT_FOUND:
+                case TOPIC_NOT_FOUND:
+                    throw new NotFoundException(code.getNumber(), status.getMessage());
+                case TOO_MANY_REQUESTS:
+                    throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+                case INTERNAL_ERROR:
+                case INTERNAL_SERVER_ERROR:
+                    throw new InternalErrorException(code.getNumber(), status.getMessage());
+                case PROXY_TIMEOUT:
+                    throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                default:
+                    throw new UnsupportedException(code.getNumber(), status.getMessage());
             }
-            throw new ClientException(code.getNumber(), status.getMessage());
         }, clientCallbackExecutor);
     }
 
@@ -295,11 +307,29 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
             impl.setReceiptHandle(response.getReceiptHandle());
             final Status status = response.getStatus();
             final Code code = status.getCode();
-            if (Code.OK.equals(code)) {
-                future0.set(null);
-                return future0;
+            switch (code) {
+                case OK:
+                    return Futures.immediateVoidFuture();
+                case BAD_REQUEST:
+                case ILLEGAL_TOPIC:
+                case ILLEGAL_CONSUMER_GROUP:
+                case INVALID_RECEIPT_HANDLE:
+                case CLIENT_ID_REQUIRED:
+                    throw new BadRequestException(code.getNumber(), status.getMessage());
+                case UNAUTHORIZED:
+                    throw new UnauthorizedException(code.getNumber(), status.getMessage());
+                case NOT_FOUND:
+                case TOPIC_NOT_FOUND:
+                case TOO_MANY_REQUESTS:
+                    throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+                case INTERNAL_ERROR:
+                case INTERNAL_SERVER_ERROR:
+                    throw new InternalErrorException(code.getNumber(), status.getMessage());
+                case PROXY_TIMEOUT:
+                    throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                default:
+                    throw new UnsupportedException(code.getNumber(), status.getMessage());
             }
-            throw new ClientException(code.getNumber(), status.getMessage());
         }, MoreExecutors.directExecutor());
     }
 
@@ -318,24 +348,24 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
     }
 
     public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
-        final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
-            new SubscriptionTopicRouteDataResult(topicRouteDataResult);
-        subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+        final SubscriptionLoadBalancer subscriptionLoadBalancer =
+            new SubscriptionLoadBalancer(topicRouteDataResult);
+        subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
     }
 
-    private ListenableFuture<SubscriptionTopicRouteDataResult> getSubscriptionTopicRouteResult(final String topic) {
-        SettableFuture<SubscriptionTopicRouteDataResult> future0 = SettableFuture.create();
-        final SubscriptionTopicRouteDataResult result = subTopicRouteDataResultCache.get(topic);
+    private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionTopicRouteResult(final String topic) {
+        SettableFuture<SubscriptionLoadBalancer> future0 = SettableFuture.create();
+        final SubscriptionLoadBalancer result = subTopicRouteDataResultCache.get(topic);
         if (null != result) {
             future0.set(result);
             return future0;
         }
         final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
         return Futures.transform(future, topicRouteDataResult -> {
-            final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
-                new SubscriptionTopicRouteDataResult(topicRouteDataResult);
-            subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
-            return subscriptionTopicRouteDataResult;
+            final SubscriptionLoadBalancer subscriptionLoadBalancer =
+                new SubscriptionLoadBalancer(topicRouteDataResult);
+            subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
+            return subscriptionLoadBalancer;
         }, MoreExecutors.directExecutor());
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
similarity index 70%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index 804cf3f..21610b2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -17,32 +17,34 @@
 
 package org.apache.rocketmq.client.java.impl.consumer;
 
-import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.Status;
 import com.google.common.collect.ImmutableList;
 import com.google.common.math.IntMath;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.concurrent.Immutable;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
 
 @Immutable
-public class SubscriptionTopicRouteDataResult {
-    private final AtomicInteger messageQueueIndex;
-
-    private final Status status;
-
+public class SubscriptionLoadBalancer {
+    private final TopicRouteDataResult topicRouteDataResult;
+    /**
+     * Index for round-robin.
+     */
+    private final AtomicInteger index;
+    /**
+     * Message queues to receive message.
+     */
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
-    public SubscriptionTopicRouteDataResult(TopicRouteDataResult topicRouteDataResult) {
-        this.messageQueueIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
-        this.status = topicRouteDataResult.getStatus();
+    public SubscriptionLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
+        this.topicRouteDataResult = topicRouteDataResult;
+        this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
         final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
-        if (Code.OK != status.getCode()) {
+        if (!topicRouteDataResult.ok()) {
             this.messageQueues = builder.build();
             return;
         }
@@ -57,16 +59,12 @@ public class SubscriptionTopicRouteDataResult {
     }
 
     public MessageQueueImpl takeMessageQueue() throws ClientException {
-        final Code code = status.getCode();
-        if (!Code.OK.equals(code)) {
-            throw new ClientException(code.getNumber(), status.getMessage());
-        }
+        topicRouteDataResult.checkAndGetTopicRouteData();
         if (messageQueues.isEmpty()) {
             // Should never reach here.
-            throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't "
-                + "exist");
+            throw new NotFoundException("Failed to take message queue due to readable message queue doesn't exist");
         }
-        final int next = messageQueueIndex.getAndIncrement();
+        final int next = index.getAndIncrement();
         return messageQueues.get(IntMath.mod(next, messageQueues.size()));
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index ab711c4..7ba3393 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -57,6 +57,7 @@ import org.apache.rocketmq.client.apis.producer.SendReceipt;
 import org.apache.rocketmq.client.apis.producer.Transaction;
 import org.apache.rocketmq.client.apis.producer.TransactionChecker;
 import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.impl.ClientImpl;
@@ -86,7 +87,7 @@ class ProducerImpl extends ClientImpl implements Producer {
     protected final ProducerSettings producerSettings;
 
     private final TransactionChecker checker;
-    private final ConcurrentMap<String/* topic */, PublishingTopicRouteDataResult> publishingRouteDataResultCache;
+    private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataResultCache;
 
     /**
      * The caller is supposed to have validated the arguments and handled throwing exception or
@@ -330,7 +331,7 @@ class ProducerImpl extends ClientImpl implements Producer {
     /**
      * Take message queue(s) from route for message publishing.
      */
-    private List<MessageQueueImpl> takeMessageQueues(PublishingTopicRouteDataResult result) throws ClientException {
+    private List<MessageQueueImpl> takeMessageQueues(PublishingLoadBalancer result) throws ClientException {
         return result.takeMessageQueues(isolated, this.getRetryPolicy().getMaxAttempts());
     }
 
@@ -410,7 +411,7 @@ class ProducerImpl extends ClientImpl implements Producer {
 
         this.topics.add(topic);
         // Get publishing topic route.
-        final ListenableFuture<PublishingTopicRouteDataResult> routeFuture = getPublishingTopicRouteResult(topic);
+        final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingTopicRouteResult(topic);
         return Futures.transformAsync(routeFuture, result -> {
             // Prepare the candidate message queue(s) for retry-sending in advance.
             final List<MessageQueueImpl> candidates = null == messageGroup ? takeMessageQueues(result) :
@@ -527,6 +528,11 @@ class ProducerImpl extends ClientImpl implements Producer {
                 }
                 // Try to do more attempts.
                 int nextAttempt = 1 + attempt;
+                // Retry immediately if the request is not throttled.
+                if (!(t instanceof TooManyRequestsException)) {
+                    send0(future, topic, messageType, candidates, messages, nextAttempt);
+                    return;
+                }
                 final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
                 LOGGER.warn("Failed to send message, would attempt to resend after {}, maxAttempts={}," +
                         " attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts,
@@ -541,24 +547,24 @@ class ProducerImpl extends ClientImpl implements Producer {
 
     @Override
     public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
-        final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
-            new PublishingTopicRouteDataResult(topicRouteDataResult);
-        publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
+        final PublishingLoadBalancer publishingLoadBalancer =
+            new PublishingLoadBalancer(topicRouteDataResult);
+        publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
     }
 
-    private ListenableFuture<PublishingTopicRouteDataResult> getPublishingTopicRouteResult(final String topic) {
-        SettableFuture<PublishingTopicRouteDataResult> future0 = SettableFuture.create();
-        final PublishingTopicRouteDataResult result = publishingRouteDataResultCache.get(topic);
+    private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) {
+        SettableFuture<PublishingLoadBalancer> future0 = SettableFuture.create();
+        final PublishingLoadBalancer result = publishingRouteDataResultCache.get(topic);
         if (null != result) {
             future0.set(result);
             return future0;
         }
         return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> {
-            SettableFuture<PublishingTopicRouteDataResult> future = SettableFuture.create();
-            final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
-                new PublishingTopicRouteDataResult(topicRouteDataResult);
-            publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
-            future.set(publishingTopicRouteDataResult);
+            SettableFuture<PublishingLoadBalancer> future = SettableFuture.create();
+            final PublishingLoadBalancer publishingLoadBalancer =
+                new PublishingLoadBalancer(topicRouteDataResult);
+            publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
+            future.set(publishingLoadBalancer);
             return future;
         }, MoreExecutors.directExecutor());
     }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
similarity index 86%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index e60f185..9b326de 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -17,8 +17,6 @@
 
 package org.apache.rocketmq.client.java.impl.producer;
 
-import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.Status;
 import com.google.common.base.Objects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Hashing;
@@ -33,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.concurrent.Immutable;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.misc.Utilities;
 import org.apache.rocketmq.client.java.route.Broker;
 import org.apache.rocketmq.client.java.route.Endpoints;
@@ -41,20 +39,22 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
 import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
 
 @Immutable
-public class PublishingTopicRouteDataResult {
+public class PublishingLoadBalancer {
+    private final TopicRouteDataResult topicRouteDataResult;
+    /**
+     * Index for round-robin.
+     */
     private final AtomicInteger index;
-
-    private final Status status;
     /**
      * Message queues to send message.
      */
     private final ImmutableList<MessageQueueImpl> messageQueues;
 
-    public PublishingTopicRouteDataResult(TopicRouteDataResult topicRouteDataResult) {
+    public PublishingLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
+        this.topicRouteDataResult = topicRouteDataResult;
         this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
-        this.status = topicRouteDataResult.getStatus();
         final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
-        if (Code.OK != status.getCode()) {
+        if (!topicRouteDataResult.ok()) {
             this.messageQueues = builder.build();
             return;
         }
@@ -69,16 +69,12 @@ public class PublishingTopicRouteDataResult {
     }
 
     private void preconditionCheckBeforeTakingMessageQueue() throws ClientException {
-        final Code code = status.getCode();
-        if (!Code.OK.equals(code)) {
-            throw new ClientException(code.getNumber(), status.getMessage());
-        }
+        topicRouteDataResult.checkAndGetTopicRouteData();
         if (messageQueues.isEmpty()) {
-            throw new ResourceNotFoundException("Failed to take message due to writable message queue doesn't exist");
+            throw new NotFoundException("Failed to take message due to writable message queue doesn't exist");
         }
     }
 
-    @SuppressWarnings("UnstableApiUsage")
     public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) throws ClientException {
         preconditionCheckBeforeTakingMessageQueue();
         final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong();
@@ -130,7 +126,7 @@ public class PublishingTopicRouteDataResult {
         if (o == null || getClass() != o.getClass()) {
             return false;
         }
-        PublishingTopicRouteDataResult that = (PublishingTopicRouteDataResult) o;
+        PublishingLoadBalancer that = (PublishingLoadBalancer) o;
         return Objects.equal(messageQueues, that.messageQueues);
     }
 
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index e718228..74a6f10 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -27,6 +27,16 @@ import java.util.List;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.PayloadTooLargeException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.RequestHeaderFieldsTooLargeException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnauthorizedException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
 import org.apache.rocketmq.client.java.message.MessageIdCodec;
 import org.apache.rocketmq.client.java.route.Endpoints;
 import org.apache.rocketmq.client.java.route.MessageQueueImpl;
@@ -61,6 +71,7 @@ public class SendReceiptImpl implements SendReceipt {
         return messageQueue.getBroker().getEndpoints();
     }
 
+    @SuppressWarnings("unused")
     public long getOffset() {
         return offset;
     }
@@ -68,29 +79,56 @@ public class SendReceiptImpl implements SendReceipt {
     public static List<SendReceiptImpl> processSendResponse(MessageQueueImpl mq,
         SendMessageResponse response) throws ClientException {
         final Status status = response.getStatus();
-        final Code code = status.getCode();
-        if (!Code.OK.equals(code)) {
-            throw new ClientException(code.getNumber(), status.getMessage());
-        }
-
         List<SendReceiptImpl> sendReceipts = new ArrayList<>();
-        List<Throwable> throwableList = new ArrayList<>();
-        final List<SendResultEntry> list = response.getEntriesList();
-        for (SendResultEntry entry : list) {
+        final List<SendResultEntry> entries = response.getEntriesList();
+        for (SendResultEntry entry : entries) {
             final Status entryStatus = entry.getStatus();
-            final Code statusCode = entryStatus.getCode();
-            if (!Code.OK.equals(statusCode)) {
-                ClientException e = new ClientException(statusCode.getNumber(), status.getMessage());
-                throwableList.add(e);
-                continue;
+            final Code code = entryStatus.getCode();
+            switch (code) {
+                case OK:
+                    final MessageId messageId = MessageIdCodec.getInstance().decode(entry.getMessageId());
+                    final String transactionId = entry.getTransactionId();
+                    final long offset = entry.getOffset();
+                    final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset);
+                    sendReceipts.add(impl);
+                    break;
+                case ILLEGAL_TOPIC:
+                case ILLEGAL_MESSAGE_TAG:
+                case ILLEGAL_MESSAGE_KEY:
+                case ILLEGAL_MESSAGE_GROUP:
+                case ILLEGAL_MESSAGE_PROPERTY_KEY:
+                case ILLEGAL_MESSAGE_ID:
+                case MESSAGE_PROPERTY_CONFLICT_WITH_TYPE:
+                case MESSAGE_CORRUPTED:
+                case CLIENT_ID_REQUIRED:
+                    throw new BadRequestException(code.getNumber(), status.getMessage());
+                case UNAUTHORIZED:
+                    throw new UnauthorizedException(code.getNumber(), status.getMessage());
+                case FORBIDDEN:
+                    throw new ForbiddenException(code.getNumber(), status.getMessage());
+                case NOT_FOUND:
+                case TOPIC_NOT_FOUND:
+                    throw new NotFoundException(code.getNumber(), status.getMessage());
+                case PAYLOAD_TOO_LARGE:
+                case MESSAGE_BODY_TOO_LARGE:
+                    throw new PayloadTooLargeException(code.getNumber(), status.getMessage());
+                case TOO_MANY_REQUESTS:
+                    throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+                case REQUEST_HEADER_FIELDS_TOO_LARGE:
+                case MESSAGE_PROPERTIES_TOO_LARGE:
+                    throw new RequestHeaderFieldsTooLargeException(code.getNumber(), status.getMessage());
+                case INTERNAL_ERROR:
+                case INTERNAL_SERVER_ERROR:
+                case HA_NOT_AVAILABLE:
+                    throw new InternalErrorException(code.getNumber(), status.getMessage());
+                case PROXY_TIMEOUT:
+                case MASTER_PERSISTENCE_TIMEOUT:
+                case SLAVE_PERSISTENCE_TIMEOUT:
+                    throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                case UNSUPPORTED:
+                default:
+                    throw new UnsupportedException(code.getNumber(), status.getMessage());
             }
-            final SendReceiptImpl impl =
-                new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()),
-                    entry.getTransactionId(), mq, entry.getOffset());
-            sendReceipts.add(impl);
-        }
-        if (!throwableList.isEmpty()) {
-            throw new ClientException(throwableList.toArray(new Throwable[0]));
         }
         return sendReceipts;
     }
@@ -99,8 +137,6 @@ public class SendReceiptImpl implements SendReceipt {
     public String toString() {
         return MoreObjects.toStringHelper(this)
             .add("messageId", messageId)
-            .add("messageQueue", messageQueue)
-            .add("offset", offset)
             .toString();
     }
 }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
index a0725a4..9d566f7 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
@@ -26,7 +26,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
 import org.apache.rocketmq.client.java.misc.Utilities;
 
 public class TopicRouteData {
@@ -62,7 +62,7 @@ public class TopicRouteData {
         return this.messageQueueImpls;
     }
 
-    public Endpoints pickEndpointsToQueryAssignments() throws ResourceNotFoundException {
+    public Endpoints pickEndpointsToQueryAssignments() throws NotFoundException {
         int nextIndex = index.getAndIncrement();
         for (int i = 0; i < messageQueueImpls.size(); i++) {
             final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++,
@@ -76,7 +76,7 @@ public class TopicRouteData {
             }
             return broker.getEndpoints();
         }
-        throw new ResourceNotFoundException("Failed to pick endpoints to query assignment");
+        throw new NotFoundException("Failed to pick endpoints to query assignment");
     }
 
     @Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index 1efb034..29ce773 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -19,10 +19,18 @@ package org.apache.rocketmq.client.java.route;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.Status;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Objects;
 import javax.annotation.concurrent.Immutable;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
 
 /**
  * Result topic route data fetched from remote.
@@ -30,19 +38,53 @@ import javax.annotation.concurrent.Immutable;
 @Immutable
 public class TopicRouteDataResult {
     private final TopicRouteData topicRouteData;
-    private final Status status;
+    private final ClientException exception;
 
     public TopicRouteDataResult(TopicRouteData topicRouteData, Status status) {
         this.topicRouteData = checkNotNull(topicRouteData, "topicRouteData should not be null");
-        this.status = checkNotNull(status, "status should not be null");
+        final Code code = status.getCode();
+        switch (code) {
+            case OK:
+                this.exception = null;
+                break;
+            case BAD_REQUEST:
+            case ILLEGAL_ACCESS_POINT:
+            case ILLEGAL_TOPIC:
+            case CLIENT_ID_REQUIRED:
+                this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+                break;
+            case NOT_FOUND:
+            case TOPIC_NOT_FOUND:
+                this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+                break;
+            case TOO_MANY_REQUESTS:
+                this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+                break;
+            case INTERNAL_ERROR:
+            case INTERNAL_SERVER_ERROR:
+                this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+                break;
+            case PROXY_TIMEOUT:
+                this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+                break;
+            default:
+                this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+        }
     }
 
     public TopicRouteData getTopicRouteData() {
         return topicRouteData;
     }
 
-    public Status getStatus() {
-        return status;
+    public TopicRouteData checkAndGetTopicRouteData() throws ClientException {
+        if (null != exception) {
+            throw exception;
+        }
+        return topicRouteData;
+    }
+
+    public boolean ok() {
+        return null == exception;
     }
 
     @Override
@@ -54,20 +96,21 @@ public class TopicRouteDataResult {
             return false;
         }
         TopicRouteDataResult result = (TopicRouteDataResult) o;
-        return Objects.equal(topicRouteData, result.topicRouteData) && Objects.equal(status, result.status);
+        return Objects.equal(topicRouteData, result.topicRouteData) && Objects.equal(exception, result.exception);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hashCode(topicRouteData, status);
+        return Objects.hashCode(topicRouteData, exception);
     }
 
     @Override
     public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("topicRouteData", topicRouteData)
-            .add("code", status.getCode().getNumber())
-            .add("message", status.getMessage())
-            .toString();
+        final MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this)
+            .add("topicRouteData", this.topicRouteData);
+        if (null == exception) {
+            return helper.toString();
+        }
+        return helper.add("exception", this.exception).toString();
     }
 }
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 52f5611..212cb06 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -292,7 +292,9 @@ public class TestBase {
     protected ListenableFuture<SendMessageResponse> failureSendMessageResponseFuture() {
         final Status status = Status.newBuilder().setCode(Code.FORBIDDEN).build();
         SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
-        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).build();
+        SendResultEntry sendResultEntry = SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
+        SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status)
+            .addEntries(sendResultEntry).build();
         future0.set(response);
         return future0;
     }