You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/29 07:28:34 UTC

[rocketmq-clients] branch master updated: Enable retry for message acknowledge/changeInvisibleDuration in push consumer (#94)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 078ca43  Enable retry for message acknowledge/changeInvisibleDuration in push consumer (#94)
078ca43 is described below

commit 078ca432583b3cbc7053329ba39372e564e24be9
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 29 15:28:29 2022 +0800

    Enable retry for message acknowledge/changeInvisibleDuration in push consumer (#94)
---
 .../java/impl/consumer/ProcessQueueImpl.java       | 125 ++++++++++++++-------
 .../client/java/impl/producer/ProducerImpl.java    |   2 +-
 2 files changed, 88 insertions(+), 39 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 965507c..f65b67a 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.client.java.impl.consumer;
 
 import apache.rocketmq.v2.AckMessageResponse;
+import apache.rocketmq.v2.ChangeInvisibleDurationResponse;
 import apache.rocketmq.v2.Code;
 import apache.rocketmq.v2.ForwardMessageToDeadLetterQueueResponse;
 import apache.rocketmq.v2.ReceiveMessageRequest;
@@ -65,8 +66,9 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"NullableProblems", "UnstableApiUsage"})
 class ProcessQueueImpl implements ProcessQueue {
-    public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY = Duration.ofMillis(100);
-    public static final Duration ACK_FIFO_MESSAGE_DELAY = Duration.ofMillis(100);
+    public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+    public static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+    public static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
     public static final Duration RECEIVE_LATER_DELAY = Duration.ofSeconds(3);
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
@@ -376,41 +378,89 @@ class ProcessQueueImpl implements ProcessQueue {
         nackMessage(messageView);
     }
 
-    private void ackMessage(MessageViewImpl messageView) {
+    private void nackMessage(MessageViewImpl messageView) {
+        final int deliveryAttempt = messageView.getDeliveryAttempt();
+        final Duration duration = consumer.getRetryPolicy().getNextAttemptDelay(deliveryAttempt);
+        changeInvisibleDuration(messageView, duration, 1);
+    }
+
+    private void changeInvisibleDuration(final MessageViewImpl messageView, final Duration duration,
+        final int attempt) {
         final String clientId = consumer.clientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
         final Endpoints endpoints = messageView.getEndpoints();
-        final ListenableFuture<RpcInvocation<AckMessageResponse>> future = consumer.ackMessage(messageView);
-        Futures.addCallback(future, new FutureCallback<RpcInvocation<AckMessageResponse>>() {
+        final ListenableFuture<RpcInvocation<ChangeInvisibleDurationResponse>> future =
+            consumer.changeInvisibleDuration(messageView, duration);
+        Futures.addCallback(future, new FutureCallback<RpcInvocation<ChangeInvisibleDurationResponse>>() {
             @Override
-            public void onSuccess(RpcInvocation<AckMessageResponse> invocation) {
+            public void onSuccess(RpcInvocation<ChangeInvisibleDurationResponse> invocation) {
+                final ChangeInvisibleDurationResponse response = invocation.getResponse();
                 final String requestId = invocation.getContext().getRequestId();
-                final AckMessageResponse response = invocation.getResponse();
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
-                if (Code.OK.equals(code)) {
-                    LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
-                        + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
+                if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
+                    LOGGER.error("Failed to change invisible duration due to the invalid receipt handle, forgive to "
+                            + "retry, clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, "
+                            + "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
+                        endpoints, requestId, status.getMessage());
                     return;
                 }
-                LOGGER.error("Failed to ack message, clientId={}, consumerGroup={}, messageId={}, mq={}, "
-                        + "endpoints={}, requestId={}, code={}, status message={}", clientId, consumerGroup, messageId,
-                    mq, endpoints, requestId, code, status.getMessage());
+                // Log failure and retry later.
+                if (!Code.OK.equals(code)) {
+                    LOGGER.error("Failed to change invisible duration, would retry later, clientId={}, "
+                            + "consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, "
+                            + "status message=[{}]", clientId, consumerGroup, messageId, attempt, mq, endpoints,
+                        requestId, status.getMessage());
+                    return;
+                }
+                // Log retries.
+                if (1 < attempt) {
+                    LOGGER.info("Finally, change invisible duration successfully, clientId={}, consumerGroup={} "
+                            + "messageId={}, attempt={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
+                        messageId, attempt, mq, endpoints, requestId);
+                    changeInvisibleDurationLater(messageView, duration, 1 + attempt);
+                    return;
+                }
+                LOGGER.debug("Change invisible duration successfully, clientId={}, consumerGroup={}, messageId={}, "
+                        + "mq={}, endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints,
+                    requestId);
             }
 
             @Override
             public void onFailure(Throwable t) {
-                LOGGER.error("Exception raised while acknowledging message, clientId={}, consumerGroup={}, "
-                        + "messageId={}, mq={}, endpoints={}", clientId, consumerGroup, messageId, mq,
-                    endpoints, t);
+                // Log failure and retry later.
+                LOGGER.error("Exception raised while changing invisible duration, would retry later, clientId={}, "
+                        + "consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
+                    messageId, mq, endpoints, t);
+                changeInvisibleDurationLater(messageView, duration, 1 + attempt);
             }
         }, MoreExecutors.directExecutor());
     }
 
-    private void nackMessage(MessageViewImpl messageView) {
-        final Duration duration = consumer.getRetryPolicy().getNextAttemptDelay(messageView.getDeliveryAttempt());
-        consumer.changeInvisibleDuration(messageView, duration);
+    private void changeInvisibleDurationLater(final MessageViewImpl messageView, final Duration duration,
+        final int attempt) {
+        final MessageId messageId = messageView.getMessageId();
+        final String clientId = consumer.clientId();
+        // Process queue is dropped, no need to proceed.
+        if (dropped) {
+            LOGGER.info("Process queue was dropped, give up to change invisible duration, mq={}, messageId={}, "
+                + "clientId={}", mq, messageId, clientId);
+            return;
+        }
+        final ScheduledExecutorService scheduler = consumer.getScheduler();
+        try {
+            scheduler.schedule(() -> changeInvisibleDuration(messageView, duration, attempt),
+                CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
+        } catch (Throwable t) {
+            if (scheduler.isShutdown()) {
+                return;
+            }
+            // Should never reach here.
+            LOGGER.error("[Bug] Failed to schedule message change invisible duration request, mq={}, messageId={}, "
+                + "clientId={}", mq, messageId, clientId);
+            changeInvisibleDurationLater(messageView, duration, 1 + attempt);
+        }
     }
 
     @Override
@@ -461,7 +511,7 @@ class ProcessQueueImpl implements ProcessQueue {
                 + "attempt={}, mq={}, messageId={}, clientId={}", maxAttempts, attempt, mq, messageId, clientId);
         }
         // Ack message or forward it to DLQ depends on consumption result.
-        ListenableFuture<Void> future = ok ? ackFifoMessage(messageView) : forwardToDeadLetterQueue(messageView);
+        ListenableFuture<Void> future = ok ? ackMessage(messageView) : forwardToDeadLetterQueue(messageView);
         future.addListener(() -> eraseMessage(messageView), consumer.getConsumptionExecutor());
         return future;
     }
@@ -491,7 +541,7 @@ class ProcessQueueImpl implements ProcessQueue {
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
                     LOGGER.error("Failed to forward message to dead letter queue, would attempt to re-forward later," +
-                            " clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+                            " clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, "
                             + "requestId={}, code={}, status message={}", clientId, consumerGroup, messageId, attempt,
                         mq, endpoints, requestId, code, status.getMessage());
                     forwardToDeadLetterQueue(messageView, 1 + attempt, future0);
@@ -535,7 +585,7 @@ class ProcessQueueImpl implements ProcessQueue {
         final ScheduledExecutorService scheduler = consumer.getScheduler();
         try {
             scheduler.schedule(() -> forwardToDeadLetterQueue(messageView, attempt, future0),
-                FORWARD_FIFO_MESSAGE_TO_DLQ_DELAY.toNanos(), TimeUnit.NANOSECONDS);
+                FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
         } catch (Throwable t) {
             if (scheduler.isShutdown()) {
                 return;
@@ -547,14 +597,13 @@ class ProcessQueueImpl implements ProcessQueue {
         }
     }
 
-    private ListenableFuture<Void> ackFifoMessage(final MessageViewImpl messageView) {
+    private ListenableFuture<Void> ackMessage(final MessageViewImpl messageView) {
         SettableFuture<Void> future = SettableFuture.create();
-        ackFifoMessage(messageView, 1, future);
+        ackMessage(messageView, 1, future);
         return future;
     }
 
-    private void ackFifoMessage(final MessageViewImpl messageView, final int attempt,
-        final SettableFuture<Void> future0) {
+    private void ackMessage(final MessageViewImpl messageView, final int attempt, final SettableFuture<Void> future0) {
         final String clientId = consumer.clientId();
         final String consumerGroup = consumer.getConsumerGroup();
         final MessageId messageId = messageView.getMessageId();
@@ -568,8 +617,8 @@ class ProcessQueueImpl implements ProcessQueue {
                 final Status status = response.getStatus();
                 final Code code = status.getCode();
                 if (Code.INVALID_RECEIPT_HANDLE.equals(code)) {
-                    LOGGER.error("Failed to ack fifo message due to the invalid receipt handle, forgive to retry, "
-                            + "clientId={}, consumerGroup={} messageId={}, attempt={}, mq={}, endpoints={}, "
+                    LOGGER.error("Failed to ack message due to the invalid receipt handle, forgive to retry, "
+                            + "clientId={}, consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, "
                             + "requestId={}, status message=[{}]", clientId, consumerGroup, messageId, attempt, mq,
                         endpoints, requestId, status.getMessage());
                     future0.setException(new BadRequestException(code.getNumber(), requestId, status.getMessage()));
@@ -577,24 +626,24 @@ class ProcessQueueImpl implements ProcessQueue {
                 }
                 // Log failure and retry later.
                 if (!Code.OK.equals(code)) {
-                    LOGGER.error("Failed to ack fifo message, would attempt to re-ack later, clientId={}, "
+                    LOGGER.error("Failed to ack message, would attempt to re-ack later, clientId={}, "
                             + "consumerGroup={}, attempt={}, messageId={}, mq={}, code={}, requestId={}, endpoints={}, "
                             + "status message=[{}]", clientId, consumerGroup, attempt, messageId, mq, code, requestId,
                         endpoints, status.getMessage());
                     ackFifoMessageLater(messageView, 1 + attempt, future0);
                     return;
                 }
+                // Set result if FIFO message is acknowledged successfully.
+                future0.setFuture(Futures.immediateVoidFuture());
                 // Log retries.
                 if (1 < attempt) {
-                    LOGGER.info("Re-ack fifo message successfully, clientId={}, consumerGroup={}, attempt={}, "
+                    LOGGER.info("Finally, ack message successfully, clientId={}, consumerGroup={}, attempt={}, "
                             + "messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup, attempt,
                         messageId, mq, endpoints, requestId);
-                } else {
-                    LOGGER.debug("Ack fifo message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
-                        + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
+                    return;
                 }
-                // Set result if FIFO message is acknowledged successfully.
-                future0.setFuture(Futures.immediateVoidFuture());
+                LOGGER.debug("Ack message successfully, clientId={}, consumerGroup={}, messageId={}, mq={}, "
+                    + "endpoints={}, requestId={}", clientId, consumerGroup, messageId, mq, endpoints, requestId);
             }
 
             @Override
@@ -620,14 +669,14 @@ class ProcessQueueImpl implements ProcessQueue {
         }
         final ScheduledExecutorService scheduler = consumer.getScheduler();
         try {
-            scheduler.schedule(() -> ackFifoMessage(messageView, attempt, future0), ACK_FIFO_MESSAGE_DELAY.toNanos(),
-                TimeUnit.NANOSECONDS);
+            scheduler.schedule(() -> ackMessage(messageView, attempt, future0),
+                ACK_MESSAGE_FAILURE_BACKOFF_DELAY.toNanos(), TimeUnit.NANOSECONDS);
         } catch (Throwable t) {
             if (scheduler.isShutdown()) {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule ack fifo message request, mq={}, messageId={}, clientId={}",
+            LOGGER.error("[Bug] Failed to schedule message ack request, mq={}, messageId={}, clientId={}",
                 mq, messageId, clientId);
             ackFifoMessageLater(messageView, 1 + attempt, future0);
         }
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
index 08483f0..18662cc 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/producer/ProducerImpl.java
@@ -324,7 +324,7 @@ class ProducerImpl extends ClientImpl implements Producer {
     /**
      * Take message queue(s) from route for message publishing.
      */
-    private List<MessageQueueImpl> takeMessageQueues(PublishingLoadBalancer result) throws ClientException {
+    private List<MessageQueueImpl> takeMessageQueues(PublishingLoadBalancer result) {
         return result.takeMessageQueues(isolated, this.getRetryPolicy().getMaxAttempts());
     }