You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/05/14 05:54:25 UTC
[rocketmq-spring] branch master updated: [ISSUE #366] The retry
number of failed consumptions can be set. (#367)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 8ebc953 [ISSUE #366] The retry number of failed consumptions can be set. (#367)
8ebc953 is described below
commit 8ebc953f7bef0e20173c90a8117a95712eebec83
Author: zhangjidi2016 <zh...@cmss.chinamobile.com>
AuthorDate: Fri May 14 13:52:59 2021 +0800
[ISSUE #366] The retry number of failed consumptions can be set. (#367)
Co-authored-by: zhangjidi2016 <zh...@cmss.chinamobile.com>
---
.../apache/rocketmq/spring/annotation/RocketMQMessageListener.java | 5 +++++
.../rocketmq/spring/support/DefaultRocketMQListenerContainer.java | 4 +++-
2 files changed, 8 insertions(+), 1 deletion(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 00bd527..bdf2ca0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -76,6 +76,11 @@ public @interface RocketMQMessageListener {
int consumeThreadMax() default 64;
/**
+ * Max re-consume times, -1 means 16 times.
+ */
+ int maxReconsumeTimes() default -1;
+
+ /**
* Maximum amount of time in minutes a message may block the consuming thread.
*/
long consumeTimeout() default 15L;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 725e967..b6705db 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -120,6 +120,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
private String selectorExpression;
private MessageModel messageModel;
private long consumeTimeout;
+ private int maxReconsumeTimes;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
@@ -219,6 +220,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
this.selectorType = anno.selectorType();
this.selectorExpression = anno.selectorExpression();
this.consumeTimeout = anno.consumeTimeout();
+ this.maxReconsumeTimes = anno.maxReconsumeTimes();
}
public ConsumeMode getConsumeMode() {
@@ -578,7 +580,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
consumer.setConsumeThreadMin(consumeThreadMax);
}
consumer.setConsumeTimeout(consumeTimeout);
-
+ consumer.setMaxReconsumeTimes(maxReconsumeTimes);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);