You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2021/05/14 05:54:25 UTC

[rocketmq-spring] branch master updated: [ISSUE #366] The retry number of failed consumptions can be set. (#367)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ebc953  [ISSUE #366] The retry number of failed consumptions can be set. (#367)
8ebc953 is described below

commit 8ebc953f7bef0e20173c90a8117a95712eebec83
Author: zhangjidi2016 <zh...@cmss.chinamobile.com>
AuthorDate: Fri May 14 13:52:59 2021 +0800

    [ISSUE #366] The retry number of failed consumptions can be set. (#367)
    
    Co-authored-by: zhangjidi2016 <zh...@cmss.chinamobile.com>
---
 .../apache/rocketmq/spring/annotation/RocketMQMessageListener.java   | 5 +++++
 .../rocketmq/spring/support/DefaultRocketMQListenerContainer.java    | 4 +++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index 00bd527..bdf2ca0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -76,6 +76,11 @@ public @interface RocketMQMessageListener {
     int consumeThreadMax() default 64;
 
     /**
+     * Max re-consume times, -1 means 16 times.
+     */
+    int maxReconsumeTimes() default -1;
+
+    /**
      * Maximum amount of time in minutes a message may block the consuming thread.
      */
     long consumeTimeout() default 15L;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 725e967..b6705db 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -120,6 +120,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
     private String selectorExpression;
     private MessageModel messageModel;
     private long consumeTimeout;
+    private int maxReconsumeTimes;
 
     public long getSuspendCurrentQueueTimeMillis() {
         return suspendCurrentQueueTimeMillis;
@@ -219,6 +220,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         this.selectorType = anno.selectorType();
         this.selectorExpression = anno.selectorExpression();
         this.consumeTimeout = anno.consumeTimeout();
+        this.maxReconsumeTimes = anno.maxReconsumeTimes();
     }
 
     public ConsumeMode getConsumeMode() {
@@ -578,7 +580,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
             consumer.setConsumeThreadMin(consumeThreadMax);
         }
         consumer.setConsumeTimeout(consumeTimeout);
-
+        consumer.setMaxReconsumeTimes(maxReconsumeTimes);
         switch (messageModel) {
             case BROADCASTING:
                 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);