You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/31 06:36:28 UTC

[rocketmq-clients] branch master updated: Bugfix: forget to retry if failed to change invisible duration (#105)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 504fe8f  Bugfix: forget to retry if failed to change invisible duration (#105)
504fe8f is described below

commit 504fe8f516af5bd3c8951031660c3c2be51b743b
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Sun Jul 31 14:36:23 2022 +0800

    Bugfix: forget to retry if failed to change invisible duration (#105)
---
 .../rocketmq/client/java/impl/consumer/ProcessQueueImpl.java      | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index 5cc55e6..a4e8b96 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -424,6 +424,7 @@ class ProcessQueueImpl implements ProcessQueue {
                             + "consumerGroup={}, messageId={}, attempt={}, mq={}, endpoints={}, requestId={}, "
                             + "status message=[{}]", clientId, consumerGroup, messageId, attempt, mq, endpoints,
                         requestId, status.getMessage());
+                    changeInvisibleDurationLater(messageView, duration, 1 + attempt);
                     return;
                 }
                 // Log retries.
@@ -431,7 +432,6 @@ class ProcessQueueImpl implements ProcessQueue {
                     LOGGER.info("Finally, change invisible duration successfully, clientId={}, consumerGroup={} "
                             + "messageId={}, attempt={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
                         messageId, attempt, mq, endpoints, requestId);
-                    changeInvisibleDurationLater(messageView, duration, 1 + attempt);
                     return;
                 }
                 LOGGER.debug("Change invisible duration successfully, clientId={}, consumerGroup={}, messageId={}, "
@@ -443,7 +443,7 @@ class ProcessQueueImpl implements ProcessQueue {
             public void onFailure(Throwable t) {
                 // Log failure and retry later.
                 LOGGER.error("Exception raised while changing invisible duration, would retry later, clientId={}, "
-                        + "consumerGroup={}, messageId={}, mq={}, endpoints={}, requestId={}", clientId, consumerGroup,
+                        + "consumerGroup={}, messageId={}, mq={}, endpoints={}", clientId, consumerGroup,
                     messageId, mq, endpoints, t);
                 changeInvisibleDurationLater(messageView, duration, 1 + attempt);
             }
@@ -511,8 +511,8 @@ class ProcessQueueImpl implements ProcessQueue {
             final Duration nextAttemptDelay = retryPolicy.getNextAttemptDelay(attempt);
             attempt = messageView.incrementAndGetDeliveryAttempt();
             LOGGER.debug("Prepare to redeliver the fifo message because of the consumption failure, maxAttempt={}," +
-                    " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}",
-                maxAttempts, attempt, mq, messageId, nextAttemptDelay, clientId);
+                " attempt={}, mq={}, messageId={}, nextAttemptDelay={}, clientId={}", maxAttempts, attempt, mq,
+                messageId, nextAttemptDelay, clientId);
             final ListenableFuture<ConsumeResult> future = service.consume(messageView, nextAttemptDelay);
             return Futures.transformAsync(future, result -> eraseFifoMessage(messageView, result),
                 MoreExecutors.directExecutor());