You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/29 07:28:34 UTC
[rocketmq-clients] branch master updated: Enable retry for message acknowledge/changeInvisibleDuration in push consumer (#94)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 078ca43 Enable retry for message acknowledge/changeInvisibleDuration in push consumer (#94)
078ca43 is described below
commit 078ca432583b3cbc7053329ba39372e564e24be9
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 29 15:28:29 2022 +0800
Enable retry for message acknowledge/changeInvisibleDuration in push consumer (#94)
---
.../java/impl/consumer/ProcessQueueImpl.java | 125 ++++++++++++++-------
.../client/java/impl/producer/ProducerImpl.java | 2 +-
2 files changed, 88 insertions(+), 39 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 965507c..f65b67a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.client.java.impl.consumer;
import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
import apache.rocketmq.v2.Code;
import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
import apache.rocketmq.v2.ReceiveMessageRequest;
@@ -65,8 +66,9 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"NullableProblems", "UnstableApiUsage"})
class ProcessQueueImpl implements ProcessQueue {
- public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY = Duration.ofMillis(100);
- public static final Duration ACK_FIFO_MESSAGE_DELAY = Duration.ofMillis(100);
+ public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+ public static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+ public static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
public static final Duration RECEIVE_LATER_DELAY = Duration.ofSeconds(3);
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
@@ -376,41 +378,89 @@ class ProcessQueueImpl implements ProcessQueue {
nackMessage(messageView);
}
- private void ackMessage(MessageViewImpl messageView) {
+ private void nackMessage(MessageViewImpl messageView) {
+ final int deliveryAttempt = messageView.getDeliveryAttempt();
+ final Duration duration = consumer.getRetryPolicy().getNextAttemptDelay(deliveryAttempt);
+ changeInvisibleDuration(messageView, duration, 1);
+ }
+
+ private void changeInvisibleDuration(final MessageViewImpl messageView, final Duration duration,
+ final int attempt) {
final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
final Endpoints endpoints = messageView.getEndpoints();
- final ListenableFuture<RpcInvocation<AckMessageResponse>> future = consumer.ackMessage(messageView);
- Futures.addCallback(future, new FutureCallback<RpcInvocation<AckMessageResponse>>() {
+ final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> future =
+ consumer.changeInvisibleDuration(messageView, duration);
+ Futures.addCallback(future, new FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() {
@Override
- public void onSuccess(RpcInvocation<AckMessageResponse> invocation) {
+ public void onSuccess(RpcInvocation<ChangeInvisibleDurationResponse> invocation) {
+ final ChangeInvisibleDurationResponse response = invocation.getResponse();
final String requestId = invocation.getContext().getRequestId();
- final AckMessageResponse response = invocation.getResponse();
final Status status = response.getStatus();
final Code code = status.getCode();
- if (Code.OK.equals(code)) {
- LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
- + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
+ if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
+ LOGGER.error("Failed to change invisible duration due to the invalid receipt handle, forgive to "
+ + "retry, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, "
+ + "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
+ endpoints, requestId, status.getMessage());
return;
}
- LOGGER.error("Failed to ack message, clientId={}, consumerGroup={}, messageId={}, mq={}, "
- + "endpoints={}, requestId={}, code={}, status message={}", clientId, consumerGroup, messageId,
- mq, endpoints, requestId, code, status.getMessage());
+ // Log failure and retry later.
+ if (!Code.OK.equals(code)) {
+ LOGGER.error("Failed to change invisible duration, would retry later, clientId={}, "
+ + "consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, "
+ + "status message=[{}]", clientId, consumerGroup, messageId, attempt, mq, endpoints,
+ requestId, status.getMessage());
+ return;
+ }
+ // Log retries.
+ if (1 < attempt) {
+ LOGGER.info("Finally, change invisible duration successfully, clientId={}, consumerGroup={} "
+ + "messageId={}, attempt={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
+ messageId, attempt, mq, endpoints, requestId);
+ changeInvisibleDurationLater(messageView, duration, 1 + attempt);
+ return;
+ }
+ LOGGER.debug("Change invisible duration successfully, clientId={}, consumerGroup={}, messageId={}, "
+ + "mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints,
+ requestId);
}
@Override
public void onFailure(Throwable t) {
- LOGGER.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, "
- + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq,
- endpoints, t);
+ // Log failure and retry later.
+ LOGGER.error("Exception raised while changing invisible duration, would retry later, clientId={}, "
+ + "consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
+ messageId, mq, endpoints, t);
+ changeInvisibleDurationLater(messageView, duration, 1 + attempt);
}
}, MoreExecutors.directExecutor());
}
- private void nackMessage(MessageViewImpl messageView) {
- final Duration duration = consumer.getRetryPolicy().getNextAttemptDelay(messageView.getDeliveryAttempt());
- consumer.changeInvisibleDuration(messageView, duration);
+ private void changeInvisibleDurationLater(final MessageViewImpl messageView, final Duration duration,
+ final int attempt) {
+ final MessageId messageId = messageView.getMessageId();
+ final String clientId = consumer.clientId();
+ // Process queue is dropped, no need to proceed.
+ if (dropped) {
+ LOGGER.info("Process queue was dropped, give up to change invisible duration, mq={}, messageId={}, "
+ + "clientId={}", mq, messageId, clientId);
+ return;
+ }
+ final ScheduledExecutorService scheduler = consumer.getScheduler();
+ try {
+ scheduler.schedule(() -> changeInvisibleDuration(messageView, duration, attempt),
+ CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
+ } catch (Throwable t) {
+ if (scheduler.isShutdown()) {
+ return;
+ }
+ // Should never reach here.
+ LOGGER.error("[Bug] Failed to schedule message change invisible duration request, mq={}, messageId={}, "
+ + "clientId={}", mq, messageId, clientId);
+ changeInvisibleDurationLater(messageView, duration, 1 + attempt);
+ }
}
@Override
@@ -461,7 +511,7 @@ class ProcessQueueImpl implements ProcessQueue {
+ "attempt={}, mq={}, messageId={}, clientId={}", maxAttempts, attempt, mq, messageId, clientId);
}
// Ack message or forward it to DLQ depends on consumption result.
- ListenableFuture<Void> future = ok ? ackFifoMessage(messageView) : forwardToDeadLetterQueue(messageView);
+ ListenableFuture<Void> future = ok ? ackMessage(messageView) : forwardToDeadLetterQueue(messageView);
future.addListener(() -> eraseMessage(messageView), consumer.getConsumptionExecutor());
return future;
}
@@ -491,7 +541,7 @@ class ProcessQueueImpl implements ProcessQueue {
// Log failure and retry later.
if (!Code.OK.equals(code)) {
LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
- " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+ " clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, "
+ "requestId={}, code={}, status message={}", clientId, consumerGroup, messageId, attempt,
mq, endpoints, requestId, code, status.getMessage());
forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
@@ -535,7 +585,7 @@ class ProcessQueueImpl implements ProcessQueue {
final ScheduledExecutorService scheduler = consumer.getScheduler();
try {
scheduler.schedule(() -> forwardToDeadLetterQueue(messageView, attempt, future0),
- FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY.toNanos(), TimeUnit.NANOSECONDS);
+ FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
if (scheduler.isShutdown()) {
return;
@@ -547,14 +597,13 @@ class ProcessQueueImpl implements ProcessQueue {
}
}
- private ListenableFuture<Void> ackFifoMessage(final MessageViewImpl messageView) {
+ private ListenableFuture<Void> ackMessage(final MessageViewImpl messageView) {
SettableFuture<Void> future = SettableFuture.create();
- ackFifoMessage(messageView, 1, future);
+ ackMessage(messageView, 1, future);
return future;
}
- private void ackFifoMessage(final MessageViewImpl messageView, final int attempt,
- final SettableFuture<Void> future0) {
+ private void ackMessage(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) {
final String clientId = consumer.clientId();
final String consumerGroup = consumer.getConsumerGroup();
final MessageId messageId = messageView.getMessageId();
@@ -568,8 +617,8 @@ class ProcessQueueImpl implements ProcessQueue {
final Status status = response.getStatus();
final Code code = status.getCode();
if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
- LOGGER.error("Failed to ack fifo message due to the invalid receipt handle, forgive to retry, "
- + "clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+ LOGGER.error("Failed to ack message due to the invalid receipt handle, forgive to retry, "
+ + "clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, "
+ "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
endpoints, requestId, status.getMessage());
future0.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
@@ -577,24 +626,24 @@ class ProcessQueueImpl implements ProcessQueue {
}
// Log failure and retry later.
if (!Code.OK.equals(code)) {
- LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, "
+ LOGGER.error("Failed to ack message, would attempt to re-ack later, clientId={}, "
+ "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, requestId={}, endpoints={}, "
+ "status message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code, requestId,
endpoints, status.getMessage());
ackFifoMessageLater(messageView, 1 + attempt, future0);
return;
}
+ // Set result if FIFO message is acknowledged successfully.
+ future0.setFuture(Futures.immediateVoidFuture());
// Log retries.
if (1 < attempt) {
- LOGGER.info("Re-ack fifo message successfully, clientId={}, consumerGroup={}, attempt={}, "
+ LOGGER.info("Finally, ack message successfully, clientId={}, consumerGroup={}, attempt={}, "
+ "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, attempt,
messageId, mq, endpoints, requestId);
- } else {
- LOGGER.debug("Ack fifo message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
- + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
+ return;
}
- // Set result if FIFO message is acknowledged successfully.
- future0.setFuture(Futures.immediateVoidFuture());
+ LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+ + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
}
@Override
@@ -620,14 +669,14 @@ class ProcessQueueImpl implements ProcessQueue {
}
final ScheduledExecutorService scheduler = consumer.getScheduler();
try {
- scheduler.schedule(() -> ackFifoMessage(messageView, attempt, future0), ACK_FIFO_MESSAGE_DELAY.toNanos(),
- TimeUnit.NANOSECONDS);
+ scheduler.schedule(() -> ackMessage(messageView, attempt, future0),
+ ACK_MESSAGE_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
if (scheduler.isShutdown()) {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}",
+ LOGGER.error("[Bug] Failed to schedule message ack request, mq={}, messageId={}, clientId={}",
mq, messageId, clientId);
ackFifoMessageLater(messageView, 1 + attempt, future0);
}
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 08483f0..18662cc 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -324,7 +324,7 @@ class ProducerImpl extends ClientImpl implements Producer {
/**
* Take message queue(s) from route for message publishing.
*/
- private List<MessageQueueImpl> takeMessageQueues(PublishingLoadBalancer result) throws ClientException {
+ private List<MessageQueueImpl> takeMessageQueues(PublishingLoadBalancer result) {
return result.takeMessageQueues(isolated, this.getRetryPolicy().getMaxAttempts());
}