You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/01 09:12:04 UTC
[rocketmq-clients] 01/02: Java: implement error handling
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch java
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
commit ff6dfffc86e2f6fb5ce9d9ecbb274beebd7b0ba6
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 1 11:30:18 2022 +0800
Java: implement error handling
---
.../rocketmq/client/apis/ClientException.java | 4 +-
...oundException.java => BadRequestException.java} | 13 ++-
...FoundException.java => ForbiddenException.java} | 13 ++-
...dException.java => InternalErrorException.java} | 16 +--
...tFoundException.java => NotFoundException.java} | 13 ++-
...xception.java => PayloadTooLargeException.java} | 13 ++-
...ndException.java => ProxyTimeoutException.java} | 14 +--
...a => RequestHeaderFieldsTooLargeException.java} | 16 +--
...xception.java => TooManyRequestsException.java} | 15 +--
...ndException.java => UnauthorizedException.java} | 14 +--
...undException.java => UnsupportedException.java} | 13 ++-
.../rocketmq/client/java/impl/ClientImpl.java | 19 +---
.../java/impl/consumer/ProcessQueueImpl.java | 46 ++-------
.../java/impl/consumer/PushConsumerImpl.java | 22 +---
.../java/impl/consumer/ReceiveMessageResult.java | 76 ++++++++++++--
.../java/impl/consumer/SimpleConsumerImpl.java | 114 +++++++++++++--------
...taResult.java => SubscriptionLoadBalancer.java} | 36 +++----
.../client/java/impl/producer/ProducerImpl.java | 34 +++---
...DataResult.java => PublishingLoadBalancer.java} | 28 +++--
.../client/java/impl/producer/SendReceiptImpl.java | 80 +++++++++++----
.../rocketmq/client/java/route/TopicRouteData.java | 6 +-
.../client/java/route/TopicRouteDataResult.java | 65 ++++++++++--
.../apache/rocketmq/client/java/tool/TestBase.java | 4 +-
23 files changed, 401 insertions(+), 273 deletions(-)
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
index 9a0ced1..442f20f 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
@@ -80,9 +80,9 @@ public class ClientException extends Exception {
return null == requestId ? Optional.empty() : Optional.of(requestId);
}
- public Optional<String> getResponseCode() {
+ public Optional<Integer> getResponseCode() {
final String responseCode = context.get(RESPONSE_CODE_KEY);
- return null == responseCode ? Optional.empty() : Optional.of(responseCode);
+ return null == responseCode ? Optional.empty() : Optional.of(Integer.parseInt(responseCode));
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
similarity index 77%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
index 217c3ef..386c916 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception for bad request, indicating that the required fields of headers are missing.
+ */
+public class BadRequestException extends ClientException {
+ public BadRequestException(int responseCode, String message) {
+ super(responseCode, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
similarity index 77%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
index 217c3ef..49eb280 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception for the case that user does not have permission to access/operation the resource.
+ */
+public class ForbiddenException extends ClientException {
+ public ForbiddenException(int responseCode, String message) {
+ super(responseCode, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
similarity index 74%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index 217c3ef..e1a44a4 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception indicates that the server/client encountered an unexpected condition that prevented it from
+ * fulfilling the request.
+ */
+public class InternalErrorException extends ClientException {
+ public InternalErrorException(int responseCode, String message) {
+ super(responseCode, message);
}
-}
+}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
similarity index 75%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
index 217c3ef..6ca06ca 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
@@ -19,12 +19,19 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
+/**
+ * Generic exception for resource not found.
+ */
+public class NotFoundException extends ClientException {
+ public NotFoundException(int responseCode, String message) {
+ super(responseCode, message);
+ }
+
+ public NotFoundException(String message) {
super(message);
}
- public ResourceNotFoundException(Throwable t) {
+ public NotFoundException(Throwable t) {
super(t);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
similarity index 76%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
index 217c3ef..6dd8358 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception represents that the request entity is larger than the limits defined by the server.
+ */
+public class PayloadTooLargeException extends ClientException {
+ public PayloadTooLargeException(int responseCode, String message) {
+ super(responseCode, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
similarity index 73%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
index 217c3ef..2a8abbf 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception represents that the server, which acts as a gateway or proxy, does not get a satisfied response
+ * in time from its upstream servers.
+ */
+public class ProxyTimeoutException extends ClientException {
+ public ProxyTimeoutException(int responseCode, String message) {
+ super(responseCode, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
similarity index 68%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
index 217c3ef..e2813f1 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception for the case that the server is unwilling to process the request because its header fields are
+ * too large. The request may be resubmitted after reducing the size of the request header fields.
+ */
+public class RequestHeaderFieldsTooLargeException extends ClientException {
+ public RequestHeaderFieldsTooLargeException(int responseCode, String message) {
+ super(responseCode, message);
}
-}
+}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
similarity index 74%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
index 217c3ef..b4fd82f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
@@ -19,12 +19,13 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception indicates that too many requests are made in short period of duration.
+ *
+ * <p>Requests are throttled.
+ */
+public class TooManyRequestsException extends ClientException {
+ public TooManyRequestsException(int responseCode, String message) {
+ super(responseCode, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
similarity index 75%
copy from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
copy to java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
index 217c3ef..361170e 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
@@ -19,12 +19,12 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception indicates that the client request lacks valid authentication credentials for the requested
+ * resource.
+ */
+public class UnauthorizedException extends ClientException {
+ public UnauthorizedException(int responseCode, String message) {
+ super(responseCode, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
similarity index 80%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
index 217c3ef..8fe578c 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ResourceNotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
@@ -19,12 +19,11 @@ package org.apache.rocketmq.client.java.exception;
import org.apache.rocketmq.client.apis.ClientException;
-public class ResourceNotFoundException extends ClientException {
- public ResourceNotFoundException(String message) {
- super(message);
- }
-
- public ResourceNotFoundException(Throwable t) {
- super(t);
+/**
+ * Generic exception for unsupported exception.
+ */
+public class UnsupportedException extends ClientException {
+ public UnsupportedException(int responseCode, String message) {
+ super(responseCode, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index cfa6db2..b16221f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -51,7 +51,6 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -67,10 +66,9 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.hook.MessageInterceptor;
@@ -187,17 +185,10 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
} catch (Throwable t) {
LOGGER.error("Failed to get topic route data result from remote during client startup, clientId={}, "
+ "topics={}", clientId, topics, t);
- throw new ResourceNotFoundException(t);
+ throw new NotFoundException(t);
}
- // Find any topic whose topic route data is failed to fetch from remote.
- final Stream<TopicRouteDataResult> stream = results.stream()
- .filter(topicRouteDataResult -> Code.OK != topicRouteDataResult.getStatus().getCode());
- final Optional<TopicRouteDataResult> any = stream.findAny();
- // There is a topic whose topic route data is failed to fetch from remote.
- if (any.isPresent()) {
- final TopicRouteDataResult result = any.get();
- final Status status = result.getStatus();
- throw new ClientException(status.getCode().getNumber(), status.getMessage());
+ for (TopicRouteDataResult result : results) {
+ result.checkAndGetTopicRouteData();
}
LOGGER.info("Fetch topic route data from remote successfully during startup, clientId={}, topics={}",
clientId, topics);
@@ -444,8 +435,8 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
LOGGER.info("Topic route result is updated, topic={}, clientId={}, {} => {}", topic, clientId,
old, topicRouteDataResult);
}
- onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
future0.set(null);
+ onTopicRouteDataResultUpdate0(topic, topicRouteDataResult);
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index ef7cf91..dc648f6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -238,7 +238,7 @@ class ProcessQueueImpl implements ProcessQueue {
final Duration duration = stopwatch.elapsed();
final List<MessageCommon> commons = result.getMessages().stream()
.map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
- if (result.getStatus().isPresent() && Code.OK.equals(result.getStatus().get().getCode())) {
+ if (result.ok()) {
consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
} else {
consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.ERROR);
@@ -320,45 +320,17 @@ class ProcessQueueImpl implements ProcessQueue {
}
private void onReceiveMessageResult(ReceiveMessageResult result) {
- Optional<Status> optionalStatus = result.getStatus();
final List<MessageViewImpl> messages = result.getMessages();
- final Endpoints endpoints = result.getEndpoints();
- if (!optionalStatus.isPresent()) {
- // Should never reach here.
- LOGGER.error("[Bug] Status in receive message result is not set, mq={}, endpoints={}", mq, endpoints);
- if (messages.isEmpty()) {
- receiveMessage();
- }
- optionalStatus = Optional.of(Status.newBuilder().setCode(Code.OK).build());
- LOGGER.error("[Bug] Status not set but message(s) found in the receive result, fix the status to OK," +
- " mq={}, endpoints={}", mq, endpoints);
+ if (!result.ok()) {
+ receiveMessageLater();
+ return;
}
- final Status status = optionalStatus.get();
- final Code code = status.getCode();
- switch (code) {
- case OK:
- if (!messages.isEmpty()) {
- cacheMessages(messages);
- consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
- consumer.getConsumeService().signal();
- }
- LOGGER.debug("Receive message with OK, mq={}, endpoints={}, messages found count={}, clientId={}",
- mq, endpoints, messages.size(), consumer.getClientId());
- receiveMessage();
- break;
- case MESSAGE_NOT_FOUND:
- LOGGER.info("Message not found while receiving message, mq={}, endpoints={}, clientId={}, code={}," +
- " status message=[{}]",
- mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
- receiveMessage();
- break;
- // Fall through on purpose.
- default:
- LOGGER.error("Failed to receive message from remote, try it later, mq={}, endpoints={}, clientId={}," +
- " code={}, status message=[{}]",
- mq, endpoints, consumer.getClientId(), code.getNumber(), status.getMessage());
- receiveMessageLater();
+ if (!messages.isEmpty()) {
+ cacheMessages(messages);
+ consumer.getReceivedMessagesQuantity().getAndAdd(messages.size());
+ consumer.getConsumeService().signal();
}
+ receiveMessage();
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index b8cd546..94d974f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -225,16 +225,11 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
this.state(), clientId);
throw new IllegalStateException("Push consumer is not running now");
}
-
final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
- final Status status = topicRouteDataResult.getStatus();
- final Code code = status.getCode();
- if (Code.OK.equals(code)) {
- subscriptionExpressions.put(topic, filterExpression);
- return this;
- }
- throw new ClientException(code.getNumber(), status.getMessage());
+ topicRouteDataResult.checkAndGetTopicRouteData();
+ subscriptionExpressions.put(topic, filterExpression);
+ return this;
}
/**
@@ -255,15 +250,8 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
private ListenableFuture<Endpoints> pickEndpointsToQueryAssignments(String topic) {
final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
return Futures.transformAsync(future, topicRouteDataResult -> {
- final SettableFuture<Endpoints> future0 = SettableFuture.create();
- final Status status = topicRouteDataResult.getStatus();
- final Code code = status.getCode();
- if (!Code.OK.equals(code)) {
- throw new ClientException(code.getNumber(), status.getMessage());
- }
- Endpoints endpoints = topicRouteDataResult.getTopicRouteData().pickEndpointsToQueryAssignments();
- future0.set(endpoints);
- return future0;
+ Endpoints endpoints = topicRouteDataResult.checkAndGetTopicRouteData().pickEndpointsToQueryAssignments();
+ return Futures.immediateFuture(endpoints);
}, MoreExecutors.directExecutor());
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 52f501e..09b8d12 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -17,30 +17,92 @@
package org.apache.rocketmq.client.java.impl.consumer;
+import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Status;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Optional;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnauthorizedException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.route.Endpoints;
public class ReceiveMessageResult {
private final Endpoints endpoints;
- private final Status status;
+ private final ClientException exception;
private final List<MessageViewImpl> messages;
public ReceiveMessageResult(Endpoints endpoints, Status status, List<MessageViewImpl> messages) {
this.endpoints = endpoints;
- this.status = status;
+ final Code code = status.getCode();
+ switch (code) {
+ case OK:
+ this.exception = null;
+ break;
+ case BAD_REQUEST:
+ case ILLEGAL_TOPIC:
+ case ILLEGAL_CONSUMER_GROUP:
+ case ILLEGAL_FILTER_EXPRESSION:
+ case CLIENT_ID_REQUIRED:
+ this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+ break;
+ case UNAUTHORIZED:
+ this.exception = new UnauthorizedException(code.getNumber(), status.getMessage());
+ break;
+ case FORBIDDEN:
+ this.exception = new ForbiddenException(code.getNumber(), status.getMessage());
+ break;
+ case MESSAGE_NOT_FOUND:
+ case NOT_FOUND:
+ case TOPIC_NOT_FOUND:
+ case CONSUMER_GROUP_NOT_FOUND:
+ this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+ break;
+ case TOO_MANY_REQUESTS:
+ this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+ break;
+ case INTERNAL_ERROR:
+ case INTERNAL_SERVER_ERROR:
+ this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+ break;
+ case PROXY_TIMEOUT:
+ this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ break;
+ default:
+ this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+ }
this.messages = messages;
}
- public Endpoints getEndpoints() {
- return endpoints;
+ /**
+ * Indicates that the result is ok or not.
+ *
+ * <p>The result is ok if the status code is {@link Code#OK} or {@link Code#MESSAGE_NOT_FOUND}.
+ *
+ * @return true if the result is ok, false otherwise.
+ */
+ public boolean ok() {
+ return null == exception || (exception.getResponseCode().isPresent() && Code.OK.getNumber() ==
+ exception.getResponseCode().get());
+ }
+
+ public List<MessageView> checkAndGetMessages() throws ClientException {
+ if (null != exception) {
+ throw exception;
+ }
+ return new ArrayList<>(messages);
}
- public Optional<Status> getStatus() {
- return null == status ? Optional.empty() : Optional.of(status);
+ public Endpoints getEndpoints() {
+ return endpoints;
}
public List<MessageViewImpl> getMessages() {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index d7b0b1c..b0ccfd9 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -32,7 +32,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -44,6 +43,14 @@ import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnauthorizedException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
import org.apache.rocketmq.client.java.impl.ClientSettings;
import org.apache.rocketmq.client.java.message.MessageViewImpl;
import org.apache.rocketmq.client.java.message.protocol.Resource;
@@ -66,7 +73,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
private final AtomicInteger topicIndex;
private final Map<String /* topic */, FilterExpression> subscriptionExpressions;
- private final ConcurrentMap<String /* topic */, SubscriptionTopicRouteDataResult> subTopicRouteDataResultCache;
+ private final ConcurrentMap<String /* topic */, SubscriptionLoadBalancer> subTopicRouteDataResultCache;
public SimpleConsumerImpl(ClientConfiguration clientConfiguration, String consumerGroup, Duration awaitDuration,
Map<String, FilterExpression> subscriptionExpressions) {
@@ -124,13 +131,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
}
final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
TopicRouteDataResult topicRouteDataResult = handleClientFuture(future);
- final Status status = topicRouteDataResult.getStatus();
- final Code code = status.getCode();
- if (Code.OK.equals(code)) {
- subscriptionExpressions.put(topic, filterExpression);
- return this;
- }
- throw new ClientException(code.getNumber(), status.getMessage());
+ topicRouteDataResult.checkAndGetTopicRouteData();
+ subscriptionExpressions.put(topic, filterExpression);
+ return this;
}
/**
@@ -192,27 +195,15 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
}
final String topic = topics.get(IntMath.mod(topicIndex.getAndIncrement(), topics.size()));
final FilterExpression filterExpression = copy.get(topic);
- final ListenableFuture<SubscriptionTopicRouteDataResult> routeFuture = getSubscriptionTopicRouteResult(topic);
+ final ListenableFuture<SubscriptionLoadBalancer> routeFuture = getSubscriptionTopicRouteResult(topic);
final ListenableFuture<ReceiveMessageResult> future0 = Futures.transformAsync(routeFuture, result -> {
final MessageQueueImpl mq = result.takeMessageQueue();
final ReceiveMessageRequest request = wrapReceiveMessageRequest(maxMessageNum, mq, filterExpression,
invisibleDuration);
return receiveMessage(request, mq, awaitDuration);
}, MoreExecutors.directExecutor());
- return Futures.transformAsync(future0, result -> {
- final Optional<Status> optionalStatus = result.getStatus();
- if (!optionalStatus.isPresent()) {
- future.set(new ArrayList<>(result.getMessages()));
- return future;
- }
- final Status status = optionalStatus.get();
- final Code code = status.getCode();
- if (Code.OK.equals(code)) {
- future.set(new ArrayList<>(result.getMessages()));
- return future;
- }
- throw new ClientException(code.getNumber(), status.getMessage());
- }, clientCallbackExecutor);
+ return Futures.transformAsync(future0, result -> Futures.immediateFuture(result.checkAndGetMessages()),
+ clientCallbackExecutor);
}
/**
@@ -253,11 +244,32 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
return Futures.transformAsync(future, response -> {
final Status status = response.getStatus();
final Code code = status.getCode();
- if (Code.OK.equals(code)) {
- future0.set(null);
- return future0;
+ switch (code) {
+ case OK:
+ return Futures.immediateVoidFuture();
+ case BAD_REQUEST:
+ case ILLEGAL_TOPIC:
+ case ILLEGAL_CONSUMER_GROUP:
+ case INVALID_RECEIPT_HANDLE:
+ case CLIENT_ID_REQUIRED:
+ throw new BadRequestException(code.getNumber(), status.getMessage());
+ case UNAUTHORIZED:
+ throw new UnauthorizedException(code.getNumber(), status.getMessage());
+ case FORBIDDEN:
+ throw new ForbiddenException(code.getNumber(), status.getMessage());
+ case NOT_FOUND:
+ case TOPIC_NOT_FOUND:
+ throw new NotFoundException(code.getNumber(), status.getMessage());
+ case TOO_MANY_REQUESTS:
+ throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+ case INTERNAL_ERROR:
+ case INTERNAL_SERVER_ERROR:
+ throw new InternalErrorException(code.getNumber(), status.getMessage());
+ case PROXY_TIMEOUT:
+ throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ default:
+ throw new UnsupportedException(code.getNumber(), status.getMessage());
}
- throw new ClientException(code.getNumber(), status.getMessage());
}, clientCallbackExecutor);
}
@@ -295,11 +307,29 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
impl.setReceiptHandle(response.getReceiptHandle());
final Status status = response.getStatus();
final Code code = status.getCode();
- if (Code.OK.equals(code)) {
- future0.set(null);
- return future0;
+ switch (code) {
+ case OK:
+ return Futures.immediateVoidFuture();
+ case BAD_REQUEST:
+ case ILLEGAL_TOPIC:
+ case ILLEGAL_CONSUMER_GROUP:
+ case INVALID_RECEIPT_HANDLE:
+ case CLIENT_ID_REQUIRED:
+ throw new BadRequestException(code.getNumber(), status.getMessage());
+ case UNAUTHORIZED:
+ throw new UnauthorizedException(code.getNumber(), status.getMessage());
+ case NOT_FOUND:
+ case TOPIC_NOT_FOUND:
+ case TOO_MANY_REQUESTS:
+ throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+ case INTERNAL_ERROR:
+ case INTERNAL_SERVER_ERROR:
+ throw new InternalErrorException(code.getNumber(), status.getMessage());
+ case PROXY_TIMEOUT:
+ throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ default:
+ throw new UnsupportedException(code.getNumber(), status.getMessage());
}
- throw new ClientException(code.getNumber(), status.getMessage());
}, MoreExecutors.directExecutor());
}
@@ -318,24 +348,24 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
}
public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
- final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
- new SubscriptionTopicRouteDataResult(topicRouteDataResult);
- subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
+ final SubscriptionLoadBalancer subscriptionLoadBalancer =
+ new SubscriptionLoadBalancer(topicRouteDataResult);
+ subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
}
- private ListenableFuture<SubscriptionTopicRouteDataResult> getSubscriptionTopicRouteResult(final String topic) {
- SettableFuture<SubscriptionTopicRouteDataResult> future0 = SettableFuture.create();
- final SubscriptionTopicRouteDataResult result = subTopicRouteDataResultCache.get(topic);
+ private ListenableFuture<SubscriptionLoadBalancer> getSubscriptionTopicRouteResult(final String topic) {
+ SettableFuture<SubscriptionLoadBalancer> future0 = SettableFuture.create();
+ final SubscriptionLoadBalancer result = subTopicRouteDataResultCache.get(topic);
if (null != result) {
future0.set(result);
return future0;
}
final ListenableFuture<TopicRouteDataResult> future = getRouteDataResult(topic);
return Futures.transform(future, topicRouteDataResult -> {
- final SubscriptionTopicRouteDataResult subscriptionTopicRouteDataResult =
- new SubscriptionTopicRouteDataResult(topicRouteDataResult);
- subTopicRouteDataResultCache.put(topic, subscriptionTopicRouteDataResult);
- return subscriptionTopicRouteDataResult;
+ final SubscriptionLoadBalancer subscriptionLoadBalancer =
+ new SubscriptionLoadBalancer(topicRouteDataResult);
+ subTopicRouteDataResultCache.put(topic, subscriptionLoadBalancer);
+ return subscriptionLoadBalancer;
}, MoreExecutors.directExecutor());
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
similarity index 70%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
index 804cf3f..21610b2 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionTopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SubscriptionLoadBalancer.java
@@ -17,32 +17,34 @@
package org.apache.rocketmq.client.java.impl.consumer;
-import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.Status;
import com.google.common.collect.ImmutableList;
import com.google.common.math.IntMath;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.Immutable;
import org.apache.commons.lang3.RandomUtils;
import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
@Immutable
-public class SubscriptionTopicRouteDataResult {
- private final AtomicInteger messageQueueIndex;
-
- private final Status status;
-
+public class SubscriptionLoadBalancer {
+ private final TopicRouteDataResult topicRouteDataResult;
+ /**
+ * Index for round-robin.
+ */
+ private final AtomicInteger index;
+ /**
+ * Message queues to receive message.
+ */
private final ImmutableList<MessageQueueImpl> messageQueues;
- public SubscriptionTopicRouteDataResult(TopicRouteDataResult topicRouteDataResult) {
- this.messageQueueIndex = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
- this.status = topicRouteDataResult.getStatus();
+ public SubscriptionLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
+ this.topicRouteDataResult = topicRouteDataResult;
+ this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
- if (Code.OK != status.getCode()) {
+ if (!topicRouteDataResult.ok()) {
this.messageQueues = builder.build();
return;
}
@@ -57,16 +59,12 @@ public class SubscriptionTopicRouteDataResult {
}
public MessageQueueImpl takeMessageQueue() throws ClientException {
- final Code code = status.getCode();
- if (!Code.OK.equals(code)) {
- throw new ClientException(code.getNumber(), status.getMessage());
- }
+ topicRouteDataResult.checkAndGetTopicRouteData();
if (messageQueues.isEmpty()) {
// Should never reach here.
- throw new ResourceNotFoundException("Failed to take message queue due to readable message queue doesn't "
- + "exist");
+ throw new NotFoundException("Failed to take message queue due to readable message queue doesn't exist");
}
- final int next = messageQueueIndex.getAndIncrement();
+ final int next = index.getAndIncrement();
return messageQueues.get(IntMath.mod(next, messageQueues.size()));
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index ab711c4..7ba3393 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -57,6 +57,7 @@ import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.apis.producer.Transaction;
import org.apache.rocketmq.client.apis.producer.TransactionChecker;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.impl.ClientImpl;
@@ -86,7 +87,7 @@ class ProducerImpl extends ClientImpl implements Producer {
protected final ProducerSettings producerSettings;
private final TransactionChecker checker;
- private final ConcurrentMap<String/* topic */, PublishingTopicRouteDataResult> publishingRouteDataResultCache;
+ private final ConcurrentMap<String/* topic */, PublishingLoadBalancer> publishingRouteDataResultCache;
/**
* The caller is supposed to have validated the arguments and handled throwing exception or
@@ -330,7 +331,7 @@ class ProducerImpl extends ClientImpl implements Producer {
/**
* Take message queue(s) from route for message publishing.
*/
- private List<MessageQueueImpl> takeMessageQueues(PublishingTopicRouteDataResult result) throws ClientException {
+ private List<MessageQueueImpl> takeMessageQueues(PublishingLoadBalancer result) throws ClientException {
return result.takeMessageQueues(isolated, this.getRetryPolicy().getMaxAttempts());
}
@@ -410,7 +411,7 @@ class ProducerImpl extends ClientImpl implements Producer {
this.topics.add(topic);
// Get publishing topic route.
- final ListenableFuture<PublishingTopicRouteDataResult> routeFuture = getPublishingTopicRouteResult(topic);
+ final ListenableFuture<PublishingLoadBalancer> routeFuture = getPublishingTopicRouteResult(topic);
return Futures.transformAsync(routeFuture, result -> {
// Prepare the candidate message queue(s) for retry-sending in advance.
final List<MessageQueueImpl> candidates = null == messageGroup ? takeMessageQueues(result) :
@@ -527,6 +528,11 @@ class ProducerImpl extends ClientImpl implements Producer {
}
// Try to do more attempts.
int nextAttempt = 1 + attempt;
+ // Retry immediately if the request is not throttled.
+ if (!(t instanceof TooManyRequestsException)) {
+ send0(future, topic, messageType, candidates, messages, nextAttempt);
+ return;
+ }
final Duration delay = ProducerImpl.this.getRetryPolicy().getNextAttemptDelay(nextAttempt);
LOGGER.warn("Failed to send message, would attempt to resend after {}, maxAttempts={}," +
" attempt={}, topic={}, messageId(s)={}, endpoints={}, clientId={}", delay, maxAttempts,
@@ -541,24 +547,24 @@ class ProducerImpl extends ClientImpl implements Producer {
@Override
public void onTopicRouteDataResultUpdate0(String topic, TopicRouteDataResult topicRouteDataResult) {
- final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
- new PublishingTopicRouteDataResult(topicRouteDataResult);
- publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
+ final PublishingLoadBalancer publishingLoadBalancer =
+ new PublishingLoadBalancer(topicRouteDataResult);
+ publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
}
- private ListenableFuture<PublishingTopicRouteDataResult> getPublishingTopicRouteResult(final String topic) {
- SettableFuture<PublishingTopicRouteDataResult> future0 = SettableFuture.create();
- final PublishingTopicRouteDataResult result = publishingRouteDataResultCache.get(topic);
+ private ListenableFuture<PublishingLoadBalancer> getPublishingTopicRouteResult(final String topic) {
+ SettableFuture<PublishingLoadBalancer> future0 = SettableFuture.create();
+ final PublishingLoadBalancer result = publishingRouteDataResultCache.get(topic);
if (null != result) {
future0.set(result);
return future0;
}
return Futures.transformAsync(getRouteDataResult(topic), topicRouteDataResult -> {
- SettableFuture<PublishingTopicRouteDataResult> future = SettableFuture.create();
- final PublishingTopicRouteDataResult publishingTopicRouteDataResult =
- new PublishingTopicRouteDataResult(topicRouteDataResult);
- publishingRouteDataResultCache.put(topic, publishingTopicRouteDataResult);
- future.set(publishingTopicRouteDataResult);
+ SettableFuture<PublishingLoadBalancer> future = SettableFuture.create();
+ final PublishingLoadBalancer publishingLoadBalancer =
+ new PublishingLoadBalancer(topicRouteDataResult);
+ publishingRouteDataResultCache.put(topic, publishingLoadBalancer);
+ future.set(publishingLoadBalancer);
return future;
}, MoreExecutors.directExecutor());
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
similarity index 86%
rename from java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
rename to java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
index e60f185..9b326de 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingTopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/PublishingLoadBalancer.java
@@ -17,8 +17,6 @@
package org.apache.rocketmq.client.java.impl.producer;
-import apache.rocketmq.v2.Code;
-import apache.rocketmq.v2.Status;
import com.google.common.base.Objects;
import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
@@ -33,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.Immutable;
import org.apache.commons.lang3.RandomUtils;
import org.apache.rocketmq.client.apis.ClientException;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Broker;
import org.apache.rocketmq.client.java.route.Endpoints;
@@ -41,20 +39,22 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
@Immutable
-public class PublishingTopicRouteDataResult {
+public class PublishingLoadBalancer {
+ private final TopicRouteDataResult topicRouteDataResult;
+ /**
+ * Index for round-robin.
+ */
private final AtomicInteger index;
-
- private final Status status;
/**
* Message queues to send message.
*/
private final ImmutableList<MessageQueueImpl> messageQueues;
- public PublishingTopicRouteDataResult(TopicRouteDataResult topicRouteDataResult) {
+ public PublishingLoadBalancer(TopicRouteDataResult topicRouteDataResult) {
+ this.topicRouteDataResult = topicRouteDataResult;
this.index = new AtomicInteger(RandomUtils.nextInt(0, Integer.MAX_VALUE));
- this.status = topicRouteDataResult.getStatus();
final ImmutableList.Builder<MessageQueueImpl> builder = ImmutableList.builder();
- if (Code.OK != status.getCode()) {
+ if (!topicRouteDataResult.ok()) {
this.messageQueues = builder.build();
return;
}
@@ -69,16 +69,12 @@ public class PublishingTopicRouteDataResult {
}
private void preconditionCheckBeforeTakingMessageQueue() throws ClientException {
- final Code code = status.getCode();
- if (!Code.OK.equals(code)) {
- throw new ClientException(code.getNumber(), status.getMessage());
- }
+ topicRouteDataResult.checkAndGetTopicRouteData();
if (messageQueues.isEmpty()) {
- throw new ResourceNotFoundException("Failed to take message due to writable message queue doesn't exist");
+ throw new NotFoundException("Failed to take message due to writable message queue doesn't exist");
}
}
- @SuppressWarnings("UnstableApiUsage")
public MessageQueueImpl takeMessageQueueByMessageGroup(String messageGroup) throws ClientException {
preconditionCheckBeforeTakingMessageQueue();
final long hashCode = Hashing.sipHash24().hashBytes(messageGroup.getBytes(StandardCharsets.UTF_8)).asLong();
@@ -130,7 +126,7 @@ public class PublishingTopicRouteDataResult {
if (o == null || getClass() != o.getClass()) {
return false;
}
- PublishingTopicRouteDataResult that = (PublishingTopicRouteDataResult) o;
+ PublishingLoadBalancer that = (PublishingLoadBalancer) o;
return Objects.equal(messageQueues, that.messageQueues);
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index e718228..74a6f10 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -27,6 +27,16 @@ import java.util.List;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.ForbiddenException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.PayloadTooLargeException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.RequestHeaderFieldsTooLargeException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnauthorizedException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
import org.apache.rocketmq.client.java.message.MessageIdCodec;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
@@ -61,6 +71,7 @@ public class SendReceiptImpl implements SendReceipt {
return messageQueue.getBroker().getEndpoints();
}
+ @SuppressWarnings("unused")
public long getOffset() {
return offset;
}
@@ -68,29 +79,56 @@ public class SendReceiptImpl implements SendReceipt {
public static List<SendReceiptImpl> processSendResponse(MessageQueueImpl mq,
SendMessageResponse response) throws ClientException {
final Status status = response.getStatus();
- final Code code = status.getCode();
- if (!Code.OK.equals(code)) {
- throw new ClientException(code.getNumber(), status.getMessage());
- }
-
List<SendReceiptImpl> sendReceipts = new ArrayList<>();
- List<Throwable> throwableList = new ArrayList<>();
- final List<SendResultEntry> list = response.getEntriesList();
- for (SendResultEntry entry : list) {
+ final List<SendResultEntry> entries = response.getEntriesList();
+ for (SendResultEntry entry : entries) {
final Status entryStatus = entry.getStatus();
- final Code statusCode = entryStatus.getCode();
- if (!Code.OK.equals(statusCode)) {
- ClientException e = new ClientException(statusCode.getNumber(), status.getMessage());
- throwableList.add(e);
- continue;
+ final Code code = entryStatus.getCode();
+ switch (code) {
+ case OK:
+ final MessageId messageId = MessageIdCodec.getInstance().decode(entry.getMessageId());
+ final String transactionId = entry.getTransactionId();
+ final long offset = entry.getOffset();
+ final SendReceiptImpl impl = new SendReceiptImpl(messageId, transactionId, mq, offset);
+ sendReceipts.add(impl);
+ break;
+ case ILLEGAL_TOPIC:
+ case ILLEGAL_MESSAGE_TAG:
+ case ILLEGAL_MESSAGE_KEY:
+ case ILLEGAL_MESSAGE_GROUP:
+ case ILLEGAL_MESSAGE_PROPERTY_KEY:
+ case ILLEGAL_MESSAGE_ID:
+ case MESSAGE_PROPERTY_CONFLICT_WITH_TYPE:
+ case MESSAGE_CORRUPTED:
+ case CLIENT_ID_REQUIRED:
+ throw new BadRequestException(code.getNumber(), status.getMessage());
+ case UNAUTHORIZED:
+ throw new UnauthorizedException(code.getNumber(), status.getMessage());
+ case FORBIDDEN:
+ throw new ForbiddenException(code.getNumber(), status.getMessage());
+ case NOT_FOUND:
+ case TOPIC_NOT_FOUND:
+ throw new NotFoundException(code.getNumber(), status.getMessage());
+ case PAYLOAD_TOO_LARGE:
+ case MESSAGE_BODY_TOO_LARGE:
+ throw new PayloadTooLargeException(code.getNumber(), status.getMessage());
+ case TOO_MANY_REQUESTS:
+ throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+ case REQUEST_HEADER_FIELDS_TOO_LARGE:
+ case MESSAGE_PROPERTIES_TOO_LARGE:
+ throw new RequestHeaderFieldsTooLargeException(code.getNumber(), status.getMessage());
+ case INTERNAL_ERROR:
+ case INTERNAL_SERVER_ERROR:
+ case HA_NOT_AVAILABLE:
+ throw new InternalErrorException(code.getNumber(), status.getMessage());
+ case PROXY_TIMEOUT:
+ case MASTER_PERSISTENCE_TIMEOUT:
+ case SLAVE_PERSISTENCE_TIMEOUT:
+ throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ case UNSUPPORTED:
+ default:
+ throw new UnsupportedException(code.getNumber(), status.getMessage());
}
- final SendReceiptImpl impl =
- new SendReceiptImpl(MessageIdCodec.getInstance().decode(entry.getMessageId()),
- entry.getTransactionId(), mq, entry.getOffset());
- sendReceipts.add(impl);
- }
- if (!throwableList.isEmpty()) {
- throw new ClientException(throwableList.toArray(new Throwable[0]));
}
return sendReceipts;
}
@@ -99,8 +137,6 @@ public class SendReceiptImpl implements SendReceipt {
public String toString() {
return MoreObjects.toStringHelper(this)
.add("messageId", messageId)
- .add("messageQueue", messageQueue)
- .add("offset", offset)
.toString();
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
index a0725a4..9d566f7 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteData.java
@@ -26,7 +26,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.rocketmq.client.java.exception.ResourceNotFoundException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.misc.Utilities;
public class TopicRouteData {
@@ -62,7 +62,7 @@ public class TopicRouteData {
return this.messageQueueImpls;
}
- public Endpoints pickEndpointsToQueryAssignments() throws ResourceNotFoundException {
+ public Endpoints pickEndpointsToQueryAssignments() throws NotFoundException {
int nextIndex = index.getAndIncrement();
for (int i = 0; i < messageQueueImpls.size(); i++) {
final MessageQueueImpl messageQueueImpl = messageQueueImpls.get(IntMath.mod(nextIndex++,
@@ -76,7 +76,7 @@ public class TopicRouteData {
}
return broker.getEndpoints();
}
- throw new ResourceNotFoundException("Failed to pick endpoints to query assignment");
+ throw new NotFoundException("Failed to pick endpoints to query assignment");
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index 1efb034..29ce773 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -19,10 +19,18 @@ package org.apache.rocketmq.client.java.route;
import static com.google.common.base.Preconditions.checkNotNull;
+import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.Status;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
import javax.annotation.concurrent.Immutable;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.InternalErrorException;
+import org.apache.rocketmq.client.java.exception.NotFoundException;
+import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
+import org.apache.rocketmq.client.java.exception.UnsupportedException;
/**
* Result topic route data fetched from remote.
@@ -30,19 +38,53 @@ import javax.annotation.concurrent.Immutable;
@Immutable
public class TopicRouteDataResult {
private final TopicRouteData topicRouteData;
- private final Status status;
+ private final ClientException exception;
public TopicRouteDataResult(TopicRouteData topicRouteData, Status status) {
this.topicRouteData = checkNotNull(topicRouteData, "topicRouteData should not be null");
- this.status = checkNotNull(status, "status should not be null");
+ final Code code = status.getCode();
+ switch (code) {
+ case OK:
+ this.exception = null;
+ break;
+ case BAD_REQUEST:
+ case ILLEGAL_ACCESS_POINT:
+ case ILLEGAL_TOPIC:
+ case CLIENT_ID_REQUIRED:
+ this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+ break;
+ case NOT_FOUND:
+ case TOPIC_NOT_FOUND:
+ this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+ break;
+ case TOO_MANY_REQUESTS:
+ this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+ break;
+ case INTERNAL_ERROR:
+ case INTERNAL_SERVER_ERROR:
+ this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+ break;
+ case PROXY_TIMEOUT:
+ this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ break;
+ default:
+ this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+ }
}
public TopicRouteData getTopicRouteData() {
return topicRouteData;
}
- public Status getStatus() {
- return status;
+ public TopicRouteData checkAndGetTopicRouteData() throws ClientException {
+ if (null != exception) {
+ throw exception;
+ }
+ return topicRouteData;
+ }
+
+ public boolean ok() {
+ return null == exception;
}
@Override
@@ -54,20 +96,21 @@ public class TopicRouteDataResult {
return false;
}
TopicRouteDataResult result = (TopicRouteDataResult) o;
- return Objects.equal(topicRouteData, result.topicRouteData) && Objects.equal(status, result.status);
+ return Objects.equal(topicRouteData, result.topicRouteData) && Objects.equal(exception, result.exception);
}
@Override
public int hashCode() {
- return Objects.hashCode(topicRouteData, status);
+ return Objects.hashCode(topicRouteData, exception);
}
@Override
public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("topicRouteData", topicRouteData)
- .add("code", status.getCode().getNumber())
- .add("message", status.getMessage())
- .toString();
+ final MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this)
+ .add("topicRouteData", this.topicRouteData);
+ if (null == exception) {
+ return helper.toString();
+ }
+ return helper.add("exception", this.exception).toString();
}
}
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index 52f5611..212cb06 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -292,7 +292,9 @@ public class TestBase {
protected ListenableFuture<SendMessageResponse> failureSendMessageResponseFuture() {
final Status status = Status.newBuilder().setCode(Code.FORBIDDEN).build();
SettableFuture<SendMessageResponse> future0 = SettableFuture.create();
- SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status).build();
+ SendResultEntry sendResultEntry = SendResultEntry.newBuilder().setStatus(status).setStatus(status).build();
+ SendMessageResponse response = SendMessageResponse.newBuilder().setStatus(status)
+ .addEntries(sendResultEntry).build();
future0.set(response);
return future0;
}