You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/14 11:20:04 UTC
[rocketmq-clients] branch master updated: Add requestId into client exception (#53)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new a49efa2 Add requestId into client exception (#53)
a49efa2 is described below
commit a49efa2ad4fb8ed73b257563e32d21698d68d13f
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Thu Jul 14 19:19:59 2022 +0800
Add requestId into client exception (#53)
---
.../rocketmq/client/apis/ClientException.java | 6 ++++
.../client/java/exception/BadRequestException.java | 4 +--
.../client/java/exception/ForbiddenException.java | 4 +--
.../java/exception/InternalErrorException.java | 8 ++---
.../client/java/exception/NotFoundException.java | 8 ++---
.../java/exception/PayloadTooLargeException.java | 4 +--
.../java/exception/ProxyTimeoutException.java | 4 +--
.../RequestHeaderFieldsTooLargeException.java | 4 +--
.../java/exception/TooManyRequestsException.java | 4 +--
.../java/exception/UnauthorizedException.java | 4 +--
.../java/exception/UnsupportedException.java | 4 +--
.../rocketmq/client/java/impl/ClientImpl.java | 17 +++++----
.../java/impl/consumer/FifoConsumeService.java | 15 +-------
.../java/impl/consumer/ProcessQueueImpl.java | 30 +++++++++++-----
.../java/impl/consumer/PushConsumerImpl.java | 4 +--
.../java/impl/consumer/ReceiveMessageResult.java | 22 +++++-------
.../java/impl/consumer/SimpleConsumerImpl.java | 42 ++++++++++++----------
.../client/java/impl/producer/ProducerImpl.java | 6 ++--
.../client/java/impl/producer/SendReceiptImpl.java | 33 +++++++++--------
.../client/java/route/TopicRouteDataResult.java | 29 +++++++++------
.../apache/rocketmq/client/java/tool/TestBase.java | 3 +-
21 files changed, 135 insertions(+), 120 deletions(-)
diff --git a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
index e622ce1..5e518de 100644
--- a/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
+++ b/java/client-apis/src/main/java/org/apache/rocketmq/client/apis/ClientException.java
@@ -55,6 +55,12 @@ public class ClientException extends Exception {
putMetadata(RESPONSE_CODE_KEY, String.valueOf(responseCode));
}
+ public ClientException(int responseCode, String requestId, String message) {
+ this(responseCode, message);
+ putMetadata(RESPONSE_CODE_KEY, String.valueOf(responseCode));
+ putMetadata(REQUEST_ID_KEY, requestId);
+ }
+
@SuppressWarnings("SameParameterValue")
protected void putMetadata(String key, String value) {
context.put(key, value);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
index 386c916..3d32bad 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/BadRequestException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* Generic exception for bad request, indicating that the required fields of headers are missing.
*/
public class BadRequestException extends ClientException {
- public BadRequestException(int responseCode, String message) {
- super(responseCode, message);
+ public BadRequestException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
index 49eb280..6f115f3 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ForbiddenException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* Generic exception for the case that user does not have permission to access/operation the resource.
*/
public class ForbiddenException extends ClientException {
- public ForbiddenException(int responseCode, String message) {
- super(responseCode, message);
+ public ForbiddenException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
index f42d369..baba366 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/InternalErrorException.java
@@ -24,11 +24,11 @@ import org.apache.rocketmq.client.apis.ClientException;
* fulfilling the request.
*/
public class InternalErrorException extends ClientException {
- public InternalErrorException(int responseCode, String message) {
- super(responseCode, message);
- }
-
public InternalErrorException(Throwable cause) {
super(cause);
}
+
+ public InternalErrorException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
+ }
}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
index 6ca06ca..fef114b 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/NotFoundException.java
@@ -23,10 +23,6 @@ import org.apache.rocketmq.client.apis.ClientException;
* Generic exception for resource not found.
*/
public class NotFoundException extends ClientException {
- public NotFoundException(int responseCode, String message) {
- super(responseCode, message);
- }
-
public NotFoundException(String message) {
super(message);
}
@@ -34,4 +30,8 @@ public class NotFoundException extends ClientException {
public NotFoundException(Throwable t) {
super(t);
}
+
+ public NotFoundException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
+ }
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
index 6dd8358..f514697 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/PayloadTooLargeException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* Generic exception represents that the request entity is larger than the limits defined by the server.
*/
public class PayloadTooLargeException extends ClientException {
- public PayloadTooLargeException(int responseCode, String message) {
- super(responseCode, message);
+ public PayloadTooLargeException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
index 2a8abbf..6212f3a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/ProxyTimeoutException.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* in time from its upstream servers.
*/
public class ProxyTimeoutException extends ClientException {
- public ProxyTimeoutException(int responseCode, String message) {
- super(responseCode, message);
+ public ProxyTimeoutException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
index e2813f1..4b396e6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/RequestHeaderFieldsTooLargeException.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* too large. The request may be resubmitted after reducing the size of the request header fields.
*/
public class RequestHeaderFieldsTooLargeException extends ClientException {
- public RequestHeaderFieldsTooLargeException(int responseCode, String message) {
- super(responseCode, message);
+ public RequestHeaderFieldsTooLargeException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
\ No newline at end of file
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
index b4fd82f..8e00433 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/TooManyRequestsException.java
@@ -25,7 +25,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* <p>Requests are throttled.
*/
public class TooManyRequestsException extends ClientException {
- public TooManyRequestsException(int responseCode, String message) {
- super(responseCode, message);
+ public TooManyRequestsException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
index 361170e..35707c9 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnauthorizedException.java
@@ -24,7 +24,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* resource.
*/
public class UnauthorizedException extends ClientException {
- public UnauthorizedException(int responseCode, String message) {
- super(responseCode, message);
+ public UnauthorizedException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
index 8fe578c..212c5eb 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/exception/UnsupportedException.java
@@ -23,7 +23,7 @@ import org.apache.rocketmq.client.apis.ClientException;
* Generic exception for unsupported exception.
*/
public class UnsupportedException extends ClientException {
- public UnsupportedException(int responseCode, String message) {
- super(responseCode, message);
+ public UnsupportedException(int responseCode, String requestId, String message) {
+ super(responseCode, requestId, message);
}
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
index 093c02d..044b6aa 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/ClientImpl.java
@@ -82,7 +82,6 @@ import org.apache.rocketmq.client.java.misc.ExecutorServices;
import org.apache.rocketmq.client.java.misc.ThreadFactoryImpl;
import org.apache.rocketmq.client.java.misc.Utilities;
import org.apache.rocketmq.client.java.route.Endpoints;
-import org.apache.rocketmq.client.java.route.TopicRouteData;
import org.apache.rocketmq.client.java.route.TopicRouteDataResult;
import org.apache.rocketmq.client.java.rpc.InvocationContext;
import org.apache.rocketmq.client.java.rpc.Signature;
@@ -99,7 +98,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
protected volatile ClientManager clientManager;
protected final ClientConfiguration clientConfiguration;
- protected final Endpoints accessEndpoints;
+ protected final Endpoints endpoints;
protected final Set<String> topics;
// Thread-safe set.
protected final Set<Endpoints> isolated;
@@ -128,7 +127,7 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
public ClientImpl(ClientConfiguration clientConfiguration, Set<String> topics) {
this.clientConfiguration = checkNotNull(clientConfiguration, "clientConfiguration should not be null");
- this.accessEndpoints = new Endpoints(clientConfiguration.getEndpoints());
+ this.endpoints = new Endpoints(clientConfiguration.getEndpoints());
this.topics = topics;
// Generate client id firstly.
this.clientId = Utilities.genClientId();
@@ -652,20 +651,20 @@ public abstract class ClientImpl extends AbstractIdleService implements Client,
try {
Resource topicResource = Resource.newBuilder().setName(topic).build();
final QueryRouteRequest request = QueryRouteRequest.newBuilder().setTopic(topicResource)
- .setEndpoints(accessEndpoints.toProtobuf()).build();
+ .setEndpoints(endpoints.toProtobuf()).build();
final Metadata metadata = sign();
final ListenableFuture<InvocationContext<QueryRouteResponse>> contextFuture =
- clientManager.queryRoute(accessEndpoints, metadata, request, clientConfiguration.getRequestTimeout());
- return Futures.transform(contextFuture, context -> {
- final QueryRouteResponse response = context.getResp();
+ clientManager.queryRoute(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
+ return Futures.transform(contextFuture, ctx -> {
+ final QueryRouteResponse response = ctx.getResp();
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.OK != code) {
LOGGER.error("Exception raised while fetch topic route from remote, topic={}, " +
"clientId={}, endpoints={}, code={}, status message=[{}]", topic, clientId,
- accessEndpoints, code, status.getMessage());
+ endpoints, code, status.getMessage());
}
- return new TopicRouteDataResult(new TopicRouteData(response.getMessageQueuesList()), status);
+ return new TopicRouteDataResult(ctx);
}, MoreExecutors.directExecutor());
} catch (Throwable t) {
return Futures.immediateFailedFuture(t);
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
index c8c783e..6dd8c9a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/FifoConsumeService.java
@@ -17,7 +17,6 @@
package org.apache.rocketmq.client.java.impl.consumer;
-import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
@@ -36,7 +35,6 @@ import org.apache.rocketmq.client.java.route.MessageQueueImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@SuppressWarnings("NullableProblems")
class FifoConsumeService extends ConsumeService {
private static final Logger LOGGER = LoggerFactory.getLogger(FifoConsumeService.class);
@@ -69,18 +67,7 @@ class FifoConsumeService extends ConsumeService {
final ListenableFuture<ConsumeResult> future0 = consume(next);
final ListenableFuture<Void> future = Futures.transformAsync(future0, result -> pq.eraseFifoMessage(next,
result), MoreExecutors.directExecutor());
- Futures.addCallback(future, new FutureCallback<Void>() {
- @Override
- public void onSuccess(Void ignore) {
- consumeIteratively(pq, iterator);
- }
-
- @Override
- public void onFailure(Throwable t) {
- // Should never reach here.
- LOGGER.error("[Bug] Exception raised in fifo message erasing task, clientId={}", clientId, t);
- }
- }, MoreExecutors.directExecutor());
+ future.addListener(() -> consumeIteratively(pq, iterator), MoreExecutors.directExecutor());
}
@Override
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index bdc6a6d..dd4cd17 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -44,6 +44,7 @@ import java.util.stream.Collectors;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageId;
+import org.apache.rocketmq.client.java.exception.BadRequestException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.message.MessageCommon;
@@ -472,6 +473,7 @@ class ProcessQueueImpl implements ProcessQueue {
return future;
}
+
private ListenableFuture<Void> forwardToDeadLetterQueue(final MessageViewImpl messageView) {
final SettableFuture<Void> future = SettableFuture.create();
forwardToDeadLetterQueue(messageView, 1, future);
@@ -490,25 +492,27 @@ class ProcessQueueImpl implements ProcessQueue {
@Override
public void onSuccess(InvocationContext<ForwardMessageToDeadLetterQueueResponse> context) {
final ForwardMessageToDeadLetterQueueResponse resp = context.getResp();
+ final String requestId = context.getRpcContext().getRequestId();
final Status status = resp.getStatus();
final Code code = status.getCode();
// Log failure and retry later.
if (!Code.OK.equals(code)) {
LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
- " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, code={}, "
- + "status message={}", clientId, consumerGroup, messageId, attempt, mq, endpoints, code,
- status.getMessage());
+ " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+ + "requestId={}, code={}, status message={}", clientId, consumerGroup, messageId, attempt,
+ mq, endpoints, requestId, code, status.getMessage());
forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
return;
}
// Log retries.
if (1 < attempt) {
LOGGER.info("Re-forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
- + "attempt={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
- messageId, mq, endpoints);
+ + "attempt={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
+ attempt, messageId, mq, endpoints, requestId);
} else {
LOGGER.debug("Forward message to dead letter queue successfully, clientId={}, consumerGroup={}, "
- + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
+ + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq,
+ endpoints, requestId);
}
// Set result if message is forwarded successfully.
future0.setFuture(Futures.immediateVoidFuture());
@@ -570,6 +574,14 @@ class ProcessQueueImpl implements ProcessQueue {
final String requestId = context.getRpcContext().getRequestId();
final Status status = resp.getStatus();
final Code code = status.getCode();
+ if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
+ LOGGER.error("Failed to ack fifo message due to the invalid receipt handle, forgive to retry, "
+ + "clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+ + "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
+ endpoints, context.getRpcContext().getRequestId(), status.getMessage());
+ future0.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
+ return;
+ }
// Log failure and retry later.
if (!Code.OK.equals(code)) {
LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, "
@@ -582,11 +594,11 @@ class ProcessQueueImpl implements ProcessQueue {
// Log retries.
if (1 < attempt) {
LOGGER.info("Re-ack fifo message successfully, clientId={}, consumerGroup={}, attempt={}, "
- + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, attempt,
- messageId, mq, endpoints);
+ + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, attempt,
+ messageId, mq, endpoints, requestId);
} else {
LOGGER.debug("Ack fifo message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
- + "endpoints={}", clientId, consumerGroup, messageId, mq, endpoints);
+ + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
}
// Set result if FIFO message is acknowledged successfully.
future0.setFuture(Futures.immediateVoidFuture());
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
index 7688b4b..f0cd53a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/PushConsumerImpl.java
@@ -124,7 +124,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
this.clientConfiguration = clientConfiguration;
Resource groupResource = new Resource(consumerGroup);
- this.pushConsumerSettings = new PushConsumerSettings(clientId, accessEndpoints, groupResource,
+ this.pushConsumerSettings = new PushConsumerSettings(clientId, endpoints, groupResource,
clientConfiguration.getRequestTimeout(), subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.subscriptionExpressions = subscriptionExpressions;
@@ -260,7 +260,7 @@ class PushConsumerImpl extends ConsumerImpl implements PushConsumer, MessageCach
private QueryAssignmentRequest wrapQueryAssignmentRequest(String topic) {
apache.rocketmq.v2.Resource topicResource = apache.rocketmq.v2.Resource.newBuilder().setName(topic).build();
return QueryAssignmentRequest.newBuilder().setTopic(topicResource)
- .setEndpoints(accessEndpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
+ .setEndpoints(endpoints.toProtobuf()).setGroup(getProtobufGroup()).build();
}
private ListenableFuture<Assignments> queryAssignment(final String topic) {
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
index 58c2206..a4d5993 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ReceiveMessageResult.java
@@ -36,14 +36,12 @@ import org.apache.rocketmq.client.java.route.Endpoints;
public class ReceiveMessageResult {
private final Endpoints endpoints;
- private final String requestId;
private final ClientException exception;
private final List<MessageViewImpl> messages;
public ReceiveMessageResult(Endpoints endpoints, String requestId, Status status, List<MessageViewImpl> messages) {
this.endpoints = endpoints;
- this.requestId = requestId;
final Code code = status.getCode();
switch (code) {
case OK:
@@ -56,31 +54,31 @@ public class ReceiveMessageResult {
case ILLEGAL_FILTER_EXPRESSION:
case ILLEGAL_INVISIBLE_TIME:
case CLIENT_ID_REQUIRED:
- this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+ this.exception = new BadRequestException(code.getNumber(), requestId, status.getMessage());
break;
case UNAUTHORIZED:
- this.exception = new UnauthorizedException(code.getNumber(), status.getMessage());
+ this.exception = new UnauthorizedException(code.getNumber(), requestId, status.getMessage());
break;
case FORBIDDEN:
- this.exception = new ForbiddenException(code.getNumber(), status.getMessage());
+ this.exception = new ForbiddenException(code.getNumber(), requestId, status.getMessage());
break;
case NOT_FOUND:
case TOPIC_NOT_FOUND:
case CONSUMER_GROUP_NOT_FOUND:
- this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+ this.exception = new NotFoundException(code.getNumber(), requestId, status.getMessage());
break;
case TOO_MANY_REQUESTS:
- this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+ this.exception = new TooManyRequestsException(code.getNumber(), requestId, status.getMessage());
break;
case INTERNAL_ERROR:
case INTERNAL_SERVER_ERROR:
- this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+ this.exception = new InternalErrorException(code.getNumber(), requestId, status.getMessage());
break;
case PROXY_TIMEOUT:
- this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ this.exception = new ProxyTimeoutException(code.getNumber(), requestId, status.getMessage());
break;
default:
- this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+ this.exception = new UnsupportedException(code.getNumber(), requestId, status.getMessage());
}
this.messages = messages;
}
@@ -112,8 +110,4 @@ public class ReceiveMessageResult {
public List<MessageViewImpl> getMessages() {
return messages;
}
-
- public String getRequestId() {
- return requestId;
- }
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
index f5029f1..f7a9998 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/SimpleConsumerImpl.java
@@ -80,7 +80,7 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
Map<String, FilterExpression> subscriptionExpressions) {
super(clientConfiguration, consumerGroup, subscriptionExpressions.keySet());
Resource groupResource = new Resource(consumerGroup);
- this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, accessEndpoints, groupResource,
+ this.simpleConsumerSettings = new SimpleConsumerSettings(clientId, endpoints, groupResource,
clientConfiguration.getRequestTimeout(), awaitDuration, subscriptionExpressions);
this.consumerGroup = consumerGroup;
this.awaitDuration = awaitDuration;
@@ -242,8 +242,9 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
}
MessageViewImpl impl = (MessageViewImpl) messageView;
final ListenableFuture<InvocationContext<AckMessageResponse>> future = ackMessage(impl);
- return Futures.transformAsync(future, context -> {
- final AckMessageResponse resp = context.getResp();
+ return Futures.transformAsync(future, ctx -> {
+ final String requestId = ctx.getRpcContext().getRequestId();
+ final AckMessageResponse resp = ctx.getResp();
final Status status = resp.getStatus();
final Code code = status.getCode();
switch (code) {
@@ -254,23 +255,23 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
case ILLEGAL_CONSUMER_GROUP:
case INVALID_RECEIPT_HANDLE:
case CLIENT_ID_REQUIRED:
- throw new BadRequestException(code.getNumber(), status.getMessage());
+ throw new BadRequestException(code.getNumber(), requestId, status.getMessage());
case UNAUTHORIZED:
- throw new UnauthorizedException(code.getNumber(), status.getMessage());
+ throw new UnauthorizedException(code.getNumber(), requestId, status.getMessage());
case FORBIDDEN:
- throw new ForbiddenException(code.getNumber(), status.getMessage());
+ throw new ForbiddenException(code.getNumber(), requestId, status.getMessage());
case NOT_FOUND:
case TOPIC_NOT_FOUND:
- throw new NotFoundException(code.getNumber(), status.getMessage());
+ throw new NotFoundException(code.getNumber(), requestId, status.getMessage());
case TOO_MANY_REQUESTS:
- throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+ throw new TooManyRequestsException(code.getNumber(), requestId, status.getMessage());
case INTERNAL_ERROR:
case INTERNAL_SERVER_ERROR:
- throw new InternalErrorException(code.getNumber(), status.getMessage());
+ throw new InternalErrorException(code.getNumber(), requestId, status.getMessage());
case PROXY_TIMEOUT:
- throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ throw new ProxyTimeoutException(code.getNumber(), requestId, status.getMessage());
default:
- throw new UnsupportedException(code.getNumber(), status.getMessage());
+ throw new UnsupportedException(code.getNumber(), requestId, status.getMessage());
}
}, clientCallbackExecutor);
}
@@ -304,12 +305,15 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
MessageViewImpl impl = (MessageViewImpl) messageView;
final ListenableFuture<InvocationContext<ChangeInvisibleDurationResponse>> future =
changeInvisibleDuration(impl, invisibleDuration);
- return Futures.transformAsync(future, context -> {
- final ChangeInvisibleDurationResponse resp = context.getResp();
+ return Futures.transformAsync(future, ctx -> {
+ final ChangeInvisibleDurationResponse resp = ctx.getResp();
+ final String requestId = ctx.getRpcContext().getRequestId();
// Refresh receipt handle manually.
impl.setReceiptHandle(resp.getReceiptHandle());
final Status status = resp.getStatus();
final Code code = status.getCode();
+ final int codeNumber = code.getNumber();
+ final String statusMessage = status.getMessage();
switch (code) {
case OK:
return Futures.immediateVoidFuture();
@@ -319,20 +323,20 @@ class SimpleConsumerImpl extends ConsumerImpl implements SimpleConsumer {
case ILLEGAL_INVISIBLE_TIME:
case INVALID_RECEIPT_HANDLE:
case CLIENT_ID_REQUIRED:
- throw new BadRequestException(code.getNumber(), status.getMessage());
+ throw new BadRequestException(codeNumber, requestId, statusMessage);
case UNAUTHORIZED:
- throw new UnauthorizedException(code.getNumber(), status.getMessage());
+ throw new UnauthorizedException(codeNumber, requestId, statusMessage);
case NOT_FOUND:
case TOPIC_NOT_FOUND:
case TOO_MANY_REQUESTS:
- throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+ throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
case INTERNAL_ERROR:
case INTERNAL_SERVER_ERROR:
- throw new InternalErrorException(code.getNumber(), status.getMessage());
+ throw new InternalErrorException(codeNumber, requestId, statusMessage);
case PROXY_TIMEOUT:
- throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
default:
- throw new UnsupportedException(code.getNumber(), status.getMessage());
+ throw new UnsupportedException(codeNumber, requestId, statusMessage);
}
}, MoreExecutors.directExecutor());
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index af192af..0764ed4 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -100,7 +100,7 @@ class ProducerImpl extends ClientImpl implements Producer {
TransactionChecker checker) {
super(clientConfiguration, topics);
ExponentialBackoffRetryPolicy retryPolicy = ExponentialBackoffRetryPolicy.immediatelyRetryPolicy(maxAttempts);
- this.producerSettings = new ProducerSettings(clientId, accessEndpoints, retryPolicy,
+ this.producerSettings = new ProducerSettings(clientId, endpoints, retryPolicy,
clientConfiguration.getRequestTimeout(), topics.stream().map(Resource::new).collect(Collectors.toSet()));
this.checker = checker;
@@ -451,9 +451,9 @@ class ProducerImpl extends ClientImpl implements Producer {
clientManager.sendMessage(endpoints, metadata, request, clientConfiguration.getRequestTimeout());
final ListenableFuture<List<SendReceiptImpl>> attemptFuture = Futures.transformAsync(responseFuture,
- response -> {
+ ctx -> {
final SettableFuture<List<SendReceiptImpl>> future0 = SettableFuture.create();
- future0.set(SendReceiptImpl.processSendResponse(messageQueue, response.getResp()));
+ future0.set(SendReceiptImpl.processRespContext(messageQueue, ctx));
return future0;
}, MoreExecutors.directExecutor());
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
index 400b2bc..f6f27f6 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/SendReceiptImpl.java
@@ -40,6 +40,7 @@ import org.apache.rocketmq.client.java.exception.UnsupportedException;
import org.apache.rocketmq.client.java.message.MessageIdCodec;
import org.apache.rocketmq.client.java.route.Endpoints;
import org.apache.rocketmq.client.java.route.MessageQueueImpl;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
public class SendReceiptImpl implements SendReceipt {
private final MessageId messageId;
@@ -76,14 +77,18 @@ public class SendReceiptImpl implements SendReceipt {
return offset;
}
- public static List<SendReceiptImpl> processSendResponse(MessageQueueImpl mq,
- SendMessageResponse response) throws ClientException {
- final Status status = response.getStatus();
+ public static List<SendReceiptImpl> processRespContext(MessageQueueImpl mq,
+ InvocationContext<SendMessageResponse> ctx) throws ClientException {
+ final String requestId = ctx.getRpcContext().getRequestId();
+ final SendMessageResponse resp = ctx.getResp();
+ final Status status = resp.getStatus();
List<SendReceiptImpl> sendReceipts = new ArrayList<>();
- final List<SendResultEntry> entries = response.getEntriesList();
+ final List<SendResultEntry> entries = resp.getEntriesList();
for (SendResultEntry entry : entries) {
final Status entryStatus = entry.getStatus();
final Code code = entryStatus.getCode();
+ final int codeNumber = code.getNumber();
+ final String statusMessage = status.getMessage();
switch (code) {
case OK:
final MessageId messageId = MessageIdCodec.getInstance().decode(entry.getMessageId());
@@ -102,33 +107,33 @@ public class SendReceiptImpl implements SendReceipt {
case MESSAGE_PROPERTY_CONFLICT_WITH_TYPE:
case MESSAGE_CORRUPTED:
case CLIENT_ID_REQUIRED:
- throw new BadRequestException(code.getNumber(), status.getMessage());
+ throw new BadRequestException(codeNumber, requestId, statusMessage);
case UNAUTHORIZED:
- throw new UnauthorizedException(code.getNumber(), status.getMessage());
+ throw new UnauthorizedException(codeNumber, requestId, statusMessage);
case FORBIDDEN:
- throw new ForbiddenException(code.getNumber(), status.getMessage());
+ throw new ForbiddenException(codeNumber, requestId, statusMessage);
case NOT_FOUND:
case TOPIC_NOT_FOUND:
- throw new NotFoundException(code.getNumber(), status.getMessage());
+ throw new NotFoundException(codeNumber, requestId, statusMessage);
case PAYLOAD_TOO_LARGE:
case MESSAGE_BODY_TOO_LARGE:
- throw new PayloadTooLargeException(code.getNumber(), status.getMessage());
+ throw new PayloadTooLargeException(codeNumber, requestId, statusMessage);
case TOO_MANY_REQUESTS:
- throw new TooManyRequestsException(code.getNumber(), status.getMessage());
+ throw new TooManyRequestsException(codeNumber, requestId, statusMessage);
case REQUEST_HEADER_FIELDS_TOO_LARGE:
case MESSAGE_PROPERTIES_TOO_LARGE:
- throw new RequestHeaderFieldsTooLargeException(code.getNumber(), status.getMessage());
+ throw new RequestHeaderFieldsTooLargeException(codeNumber, requestId, statusMessage);
case INTERNAL_ERROR:
case INTERNAL_SERVER_ERROR:
case HA_NOT_AVAILABLE:
- throw new InternalErrorException(code.getNumber(), status.getMessage());
+ throw new InternalErrorException(codeNumber, requestId, statusMessage);
case PROXY_TIMEOUT:
case MASTER_PERSISTENCE_TIMEOUT:
case SLAVE_PERSISTENCE_TIMEOUT:
- throw new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ throw new ProxyTimeoutException(codeNumber, requestId, statusMessage);
case UNSUPPORTED:
default:
- throw new UnsupportedException(code.getNumber(), status.getMessage());
+ throw new UnsupportedException(codeNumber, requestId, statusMessage);
}
}
return sendReceipts;
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
index 29ce773..8c9b75f 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/route/TopicRouteDataResult.java
@@ -17,12 +17,13 @@
package org.apache.rocketmq.client.java.route;
-import static com.google.common.base.Preconditions.checkNotNull;
-
import apache.rocketmq.v2.Code;
+import apache.rocketmq.v2.MessageQueue;
+import apache.rocketmq.v2.QueryRouteResponse;
import apache.rocketmq.v2.Status;
import com.google.common.base.MoreObjects;
import com.google.common.base.Objects;
+import java.util.List;
import javax.annotation.concurrent.Immutable;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.java.exception.BadRequestException;
@@ -31,6 +32,7 @@ import org.apache.rocketmq.client.java.exception.NotFoundException;
import org.apache.rocketmq.client.java.exception.ProxyTimeoutException;
import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.exception.UnsupportedException;
+import org.apache.rocketmq.client.java.rpc.InvocationContext;
/**
* Result topic route data fetched from remote.
@@ -40,9 +42,16 @@ public class TopicRouteDataResult {
private final TopicRouteData topicRouteData;
private final ClientException exception;
- public TopicRouteDataResult(TopicRouteData topicRouteData, Status status) {
- this.topicRouteData = checkNotNull(topicRouteData, "topicRouteData should not be null");
+ public TopicRouteDataResult(InvocationContext<QueryRouteResponse> ctx) {
+ final QueryRouteResponse resp = ctx.getResp();
+ final String requestId = ctx.getRpcContext().getRequestId();
+ final List<MessageQueue> messageQueuesList = resp.getMessageQueuesList();
+ final TopicRouteData topicRouteData = new TopicRouteData(messageQueuesList);
+ final Status status = resp.getStatus();
+ this.topicRouteData = topicRouteData;
final Code code = status.getCode();
+ final int codeNumber = code.getNumber();
+ final String statusMessage = status.getMessage();
switch (code) {
case OK:
this.exception = null;
@@ -51,24 +60,24 @@ public class TopicRouteDataResult {
case ILLEGAL_ACCESS_POINT:
case ILLEGAL_TOPIC:
case CLIENT_ID_REQUIRED:
- this.exception = new BadRequestException(code.getNumber(), status.getMessage());
+ this.exception = new BadRequestException(codeNumber, requestId, statusMessage);
break;
case NOT_FOUND:
case TOPIC_NOT_FOUND:
- this.exception = new NotFoundException(code.getNumber(), status.getMessage());
+ this.exception = new NotFoundException(codeNumber, requestId, statusMessage);
break;
case TOO_MANY_REQUESTS:
- this.exception = new TooManyRequestsException(code.getNumber(), status.getMessage());
+ this.exception = new TooManyRequestsException(codeNumber, requestId, statusMessage);
break;
case INTERNAL_ERROR:
case INTERNAL_SERVER_ERROR:
- this.exception = new InternalErrorException(code.getNumber(), status.getMessage());
+ this.exception = new InternalErrorException(codeNumber, requestId, statusMessage);
break;
case PROXY_TIMEOUT:
- this.exception = new ProxyTimeoutException(code.getNumber(), status.getMessage());
+ this.exception = new ProxyTimeoutException(codeNumber, requestId, statusMessage);
break;
default:
- this.exception = new UnsupportedException(code.getNumber(), status.getMessage());
+ this.exception = new UnsupportedException(codeNumber, requestId, statusMessage);
}
}
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
index f3bcbb2..3aa6d80 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/tool/TestBase.java
@@ -361,8 +361,7 @@ public class TestBase {
MessageQueueImpl mq) throws ExecutionException, InterruptedException, ClientException {
final ListenableFuture<InvocationContext<SendMessageResponse>> future =
okSendMessageResponseFutureWithSingleEntry();
- final SendMessageResponse response = future.get().getResp();
- final List<SendReceiptImpl> receipts = SendReceiptImpl.processSendResponse(mq, response);
+ final List<SendReceiptImpl> receipts = SendReceiptImpl.processRespContext(mq, future.get());
return receipts.iterator().next();
}
}