You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/29 08:35:58 UTC

[rocketmq-clients] branch master updated: Use different message receiving backoff delay for different error (#96)

This is an automated email from the ASF dual-hosted git repository.

aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git


The following commit(s) were added to refs/heads/master by this push:
     new 33747c8  Use different message receiving backoff delay for different error (#96)
33747c8 is described below

commit 33747c8442c0becf937b2766e16395636f8e4b81
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 29 16:35:53 2022 +0800

    Use different message receiving backoff delay for different error (#96)
---
 .../client/java/impl/consumer/ConsumerImpl.java    |  3 +-
 .../java/impl/consumer/ProcessQueueImpl.java       | 50 ++++++++++++----------
 .../java/impl/consumer/ProcessQueueImplTest.java   |  2 +-
 3 files changed, 30 insertions(+), 25 deletions(-)

diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index d527eef..6e09fea 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -81,8 +81,7 @@ abstract class ConsumerImpl extends ClientImpl {
             Metadata metadata = sign();
             final Endpoints endpoints = mq.getBroker().getEndpoints();
             final ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
-                clientManager.receiveMessage(endpoints,
-                    metadata, request, timeout);
+                clientManager.receiveMessage(endpoints, metadata, request, timeout);
             return Futures.transformAsync(future, context -> {
                 final Iterator<ReceiveMessageResponse> it = context.getResponse();
                 Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index f65b67a..49d15e0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.message.MessageId;
 import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
 import org.apache.rocketmq.client.java.hook.MessageHookPoints;
 import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
 import org.apache.rocketmq.client.java.message.MessageCommon;
@@ -66,13 +67,16 @@ import org.slf4j.LoggerFactory;
  */
 @SuppressWarnings({"NullableProblems", "UnstableApiUsage"})
 class ProcessQueueImpl implements ProcessQueue {
-    public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
-    public static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
-    public static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
-    public static final Duration RECEIVE_LATER_DELAY = Duration.ofSeconds(3);
-
     private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
 
+    private static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+    private static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+    private static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+
+    private static final Duration RECEIVING_FLOW_CONTROL_BACKOFF_DELAY = Duration.ofMillis(20);
+    private static final Duration RECEIVING_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+    private static final Duration RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL = Duration.ofSeconds(1);
+
     private final PushConsumerImpl consumer;
 
     /**
@@ -187,18 +191,25 @@ class ProcessQueueImpl implements ProcessQueue {
      *
      * <p> Make sure that no exception will be thrown.
      */
-    public void receiveMessageLater() {
+    public void onReceiveMessageException(Throwable t) {
+        Duration delay = t instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
+            RECEIVING_FAILURE_BACKOFF_DELAY;
+        receiveMessageLater(delay);
+    }
+
+    private void receiveMessageLater(Duration delay) {
+        final String clientId = consumer.clientId();
         final ScheduledExecutorService scheduler = consumer.getScheduler();
         try {
-            scheduler.schedule(this::receiveMessage, RECEIVE_LATER_DELAY.toNanos(), TimeUnit.NANOSECONDS);
+            LOGGER.info("Try to receive message later, mq={}, delay={}, clientId={}", mq, delay, clientId);
+            scheduler.schedule(this::receiveMessage, delay.toNanos(), TimeUnit.NANOSECONDS);
         } catch (Throwable t) {
             if (scheduler.isShutdown()) {
                 return;
             }
             // Should never reach here.
-            LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq,
-                consumer.clientId(), t);
-            receiveMessageLater();
+            LOGGER.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", mq, clientId, t);
+            onReceiveMessageException(t);
         }
     }
 
@@ -211,7 +222,7 @@ class ProcessQueueImpl implements ProcessQueue {
         if (this.isCacheFull()) {
             LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq,
                 consumer.clientId());
-            receiveMessageLater();
+            receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
             return;
         }
         receiveMessageImmediately();
@@ -228,11 +239,9 @@ class ProcessQueueImpl implements ProcessQueue {
             final int batchSize = this.getReceptionBatchSize();
             final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);
             activityNanoTime = System.nanoTime();
-
             // Intercept before message reception.
             consumer.doBefore(MessageHookPoints.RECEIVE, Collections.emptyList());
             final Stopwatch stopwatch = Stopwatch.createStarted();
-
             final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
                 consumer.getPushConsumerSettings().getLongPollingTimeout());
             Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@@ -243,7 +252,6 @@ class ProcessQueueImpl implements ProcessQueue {
                     final List<MessageCommon> commons = result.getMessageViewImpls().stream()
                         .map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
                     consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
-
                     try {
                         onReceiveMessageResult(result);
                     } catch (Throwable t) {
@@ -251,7 +259,7 @@ class ProcessQueueImpl implements ProcessQueue {
                         LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," +
                                 " mq={}, endpoints={}, clientId={}",
                             mq, endpoints, consumer.clientId(), t);
-                        receiveMessageLater();
+                        onReceiveMessageException(t);
                     }
                 }
 
@@ -261,17 +269,15 @@ class ProcessQueueImpl implements ProcessQueue {
                     final Duration duration = stopwatch.elapsed();
                     consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration,
                         MessageHookPointsStatus.ERROR);
-
-                    LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," +
-                        " clientId={}", mq, endpoints, consumer.clientId(), t);
-                    receiveMessageLater();
+                    LOGGER.error("Exception raised during message reception, mq={}, endpoints={}, clientId={}", mq,
+                        endpoints, consumer.clientId(), t);
+                    onReceiveMessageException(t);
                 }
             }, MoreExecutors.directExecutor());
             consumer.getReceptionTimes().getAndIncrement();
         } catch (Throwable t) {
-            LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq,
-                consumer.clientId(), t);
-            receiveMessageLater();
+            LOGGER.error("Exception raised during message reception, mq={}, clientId={}", mq, consumer.clientId(), t);
+            onReceiveMessageException(t);
         }
     }
 
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 24fc847..ddc3be7 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -162,7 +162,7 @@ public class ProcessQueueImplTest extends TestBase {
         when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
             any(FilterExpression.class))).thenReturn(request);
         processQueue.fetchMessageImmediately();
-        Thread.sleep(ProcessQueueImpl.RECEIVE_LATER_DELAY.toMillis() / 2);
+        Thread.sleep(3000);
         verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
             .receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class), any(Duration.class));
     }