You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by aa...@apache.org on 2022/07/29 08:35:58 UTC
[rocketmq-clients] branch master updated: Use different message receiving backoff delay for different error (#96)
This is an automated email from the ASF dual-hosted git repository.
aaronai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new 33747c8 Use different message receiving backoff delay for different error (#96)
33747c8 is described below
commit 33747c8442c0becf937b2766e16395636f8e4b81
Author: Aaron Ai <ya...@gmail.com>
AuthorDate: Fri Jul 29 16:35:53 2022 +0800
Use different message receiving backoff delay for different error (#96)
---
.../client/java/impl/consumer/ConsumerImpl.java | 3 +-
.../java/impl/consumer/ProcessQueueImpl.java | 50 ++++++++++++----------
.../java/impl/consumer/ProcessQueueImplTest.java | 2 +-
3 files changed, 30 insertions(+), 25 deletions(-)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
index d527eef..6e09fea 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ConsumerImpl.java
@@ -81,8 +81,7 @@ abstract class ConsumerImpl extends ClientImpl {
Metadata metadata = sign();
final Endpoints endpoints = mq.getBroker().getEndpoints();
final ListenableFuture<RpcInvocation<Iterator<ReceiveMessageResponse>>> future =
- clientManager.receiveMessage(endpoints,
- metadata, request, timeout);
+ clientManager.receiveMessage(endpoints, metadata, request, timeout);
return Futures.transformAsync(future, context -> {
final Iterator<ReceiveMessageResponse> it = context.getResponse();
Status status = Status.newBuilder().setCode(Code.INTERNAL_SERVER_ERROR)
diff --git a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
index f65b67a..49d15e0 100644
--- a/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
+++ b/java/client/src/main/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImpl.java
@@ -46,6 +46,7 @@ import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.message.MessageId;
import org.apache.rocketmq.client.java.exception.BadRequestException;
+import org.apache.rocketmq.client.java.exception.TooManyRequestsException;
import org.apache.rocketmq.client.java.hook.MessageHookPoints;
import org.apache.rocketmq.client.java.hook.MessageHookPointsStatus;
import org.apache.rocketmq.client.java.message.MessageCommon;
@@ -66,13 +67,16 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"NullableProblems", "UnstableApiUsage"})
class ProcessQueueImpl implements ProcessQueue {
- public static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
- public static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
- public static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
- public static final Duration RECEIVE_LATER_DELAY = Duration.ofSeconds(3);
-
private static final Logger LOGGER = LoggerFactory.getLogger(ProcessQueueImpl.class);
+ private static final Duration FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+ private static final Duration ACK_MESSAGE_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+ private static final Duration CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+
+ private static final Duration RECEIVING_FLOW_CONTROL_BACKOFF_DELAY = Duration.ofMillis(20);
+ private static final Duration RECEIVING_FAILURE_BACKOFF_DELAY = Duration.ofSeconds(1);
+ private static final Duration RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL = Duration.ofSeconds(1);
+
private final PushConsumerImpl consumer;
/**
@@ -187,18 +191,25 @@ class ProcessQueueImpl implements ProcessQueue {
*
* <p> Make sure that no exception will be thrown.
*/
- public void receiveMessageLater() {
+ public void onReceiveMessageException(Throwable t) {
+ Duration delay = t instanceof TooManyRequestsException ? RECEIVING_FLOW_CONTROL_BACKOFF_DELAY :
+ RECEIVING_FAILURE_BACKOFF_DELAY;
+ receiveMessageLater(delay);
+ }
+
+ private void receiveMessageLater(Duration delay) {
+ final String clientId = consumer.clientId();
final ScheduledExecutorService scheduler = consumer.getScheduler();
try {
- scheduler.schedule(this::receiveMessage, RECEIVE_LATER_DELAY.toNanos(), TimeUnit.NANOSECONDS);
+ LOGGER.info("Try to receive message later, mq={}, delay={}, clientId={}", mq, delay, clientId);
+ scheduler.schedule(this::receiveMessage, delay.toNanos(), TimeUnit.NANOSECONDS);
} catch (Throwable t) {
if (scheduler.isShutdown()) {
return;
}
// Should never reach here.
- LOGGER.error("[Bug] Failed to schedule receive message request, mq={}, clientId={}", mq,
- consumer.clientId(), t);
- receiveMessageLater();
+ LOGGER.error("[Bug] Failed to schedule message receiving request, mq={}, clientId={}", mq, clientId, t);
+ onReceiveMessageException(t);
}
}
@@ -211,7 +222,7 @@ class ProcessQueueImpl implements ProcessQueue {
if (this.isCacheFull()) {
LOGGER.warn("Process queue cache is full, would receive message later, mq={}, clientId={}", mq,
consumer.clientId());
- receiveMessageLater();
+ receiveMessageLater(RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL);
return;
}
receiveMessageImmediately();
@@ -228,11 +239,9 @@ class ProcessQueueImpl implements ProcessQueue {
final int batchSize = this.getReceptionBatchSize();
final ReceiveMessageRequest request = consumer.wrapReceiveMessageRequest(batchSize, mq, filterExpression);
activityNanoTime = System.nanoTime();
-
// Intercept before message reception.
consumer.doBefore(MessageHookPoints.RECEIVE, Collections.emptyList());
final Stopwatch stopwatch = Stopwatch.createStarted();
-
final ListenableFuture<ReceiveMessageResult> future = consumer.receiveMessage(request, mq,
consumer.getPushConsumerSettings().getLongPollingTimeout());
Futures.addCallback(future, new FutureCallback<ReceiveMessageResult>() {
@@ -243,7 +252,6 @@ class ProcessQueueImpl implements ProcessQueue {
final List<MessageCommon> commons = result.getMessageViewImpls().stream()
.map(MessageViewImpl::getMessageCommon).collect(Collectors.toList());
consumer.doAfter(MessageHookPoints.RECEIVE, commons, duration, MessageHookPointsStatus.OK);
-
try {
onReceiveMessageResult(result);
} catch (Throwable t) {
@@ -251,7 +259,7 @@ class ProcessQueueImpl implements ProcessQueue {
LOGGER.error("[Bug] Exception raised while handling receive result, would receive later," +
" mq={}, endpoints={}, clientId={}",
mq, endpoints, consumer.clientId(), t);
- receiveMessageLater();
+ onReceiveMessageException(t);
}
}
@@ -261,17 +269,15 @@ class ProcessQueueImpl implements ProcessQueue {
final Duration duration = stopwatch.elapsed();
consumer.doAfter(MessageHookPoints.RECEIVE, Collections.emptyList(), duration,
MessageHookPointsStatus.ERROR);
-
- LOGGER.error("Exception raised while message reception, would receive later, mq={}, endpoints={}," +
- " clientId={}", mq, endpoints, consumer.clientId(), t);
- receiveMessageLater();
+ LOGGER.error("Exception raised during message reception, mq={}, endpoints={}, clientId={}", mq,
+ endpoints, consumer.clientId(), t);
+ onReceiveMessageException(t);
}
}, MoreExecutors.directExecutor());
consumer.getReceptionTimes().getAndIncrement();
} catch (Throwable t) {
- LOGGER.error("Exception raised while message reception, would receive later, mq={}, clientId={}", mq,
- consumer.clientId(), t);
- receiveMessageLater();
+ LOGGER.error("Exception raised during message reception, mq={}, clientId={}", mq, consumer.clientId(), t);
+ onReceiveMessageException(t);
}
}
diff --git a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
index 24fc847..ddc3be7 100644
--- a/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
+++ b/java/client/src/test/java/org/apache/rocketmq/client/java/impl/consumer/ProcessQueueImplTest.java
@@ -162,7 +162,7 @@ public class ProcessQueueImplTest extends TestBase {
when(pushConsumer.wrapReceiveMessageRequest(anyInt(), any(MessageQueueImpl.class),
any(FilterExpression.class))).thenReturn(request);
processQueue.fetchMessageImmediately();
- Thread.sleep(ProcessQueueImpl.RECEIVE_LATER_DELAY.toMillis() / 2);
+ Thread.sleep(3000);
verify(pushConsumer, times(cachedMessagesCountThresholdPerQueue))
.receiveMessage(any(ReceiveMessageRequest.class), any(MessageQueueImpl.class), any(Duration.class));
}