You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/07 09:22:09 UTC

[rocketmq-spring] branch master updated: [ISSUE #419]Support consumption retry strategy configuration.

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 45833f6  [ISSUE #419]Support consumption retry strategy configuration.
45833f6 is described below

commit 45833f6c5d7bd271ae44b5dda3b7a49f42458e2c
Author: CharliePu <he...@163.com>
AuthorDate: Mon Feb 7 17:22:02 2022 +0800

    [ISSUE #419]Support consumption retry strategy configuration.
---
 .../spring/annotation/RocketMQMessageListener.java | 16 ++++++++++
 .../support/DefaultRocketMQListenerContainer.java  | 16 ++++++++--
 .../DefaultRocketMQListenerContainerTest.java      | 36 ++++++++++++++++++++++
 3 files changed, 65 insertions(+), 3 deletions(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index ecc3e0e..ca8c857 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -132,4 +132,20 @@ public @interface RocketMQMessageListener {
      * The namespace of consumer.
      */
     String namespace() default "";
+
+    /**
+     * Message consume retry strategy in concurrently mode.
+     *
+     * -1,no retry,put into DLQ directly
+     * 0,broker control retry frequency
+     * >0,client control retry frequency
+     */
+    int delayLevelWhenNextConsume() default 0;
+
+    /**
+     * The interval of suspending the pull in orderly mode, in milliseconds.
+     *
+     * The minimum value is 10 and the maximum is 30000.
+     */
+    int suspendCurrentQueueTimeMillis() default 1000;
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index f48788e..6f5f5d0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -79,11 +79,19 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
      */
     private String name;
 
+    /**
+     * Suspending pulling time in orderly mode.
+     *
+     * The minimum value is 10 and the maximum is 30000.
+     */
     private long suspendCurrentQueueTimeMillis = 1000;
 
     /**
-     * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
-     * >0,client control retry frequency.
+     * Message consume retry strategy in concurrently mode.
+     *
+     * -1,no retry,put into DLQ directly
+     * 0,broker control retry frequency
+     * >0,client control retry frequency
      */
     private int delayLevelWhenNextConsume = 0;
 
@@ -228,6 +236,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         this.replyTimeout = anno.replyTimeout();
         this.tlsEnable = anno.tlsEnable();
         this.namespace = anno.namespace();
+        this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
+        this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
     }
 
     public ConsumeMode getConsumeMode() {
@@ -652,4 +662,4 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
 
     }
 
-}
\ No newline at end of file
+}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 1304b9f..76560fc 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -29,6 +29,10 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.annotation.SelectorType;
 import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.junit.Test;
@@ -42,6 +46,7 @@ import java.util.Arrays;
 import java.util.Date;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -232,6 +237,37 @@ public class DefaultRocketMQListenerContainerTest {
         handleMessage.invoke(listenerContainer, messageExt);
     }
 
+    @Test
+    public void testSetRocketMQMessageListener() {
+        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
+        RocketMQMessageListener anno = TestRocketMQMessageListener.class.getAnnotation(RocketMQMessageListener.class);
+        container.setRocketMQMessageListener(anno);
+
+        assertEquals(anno.consumeMode(), container.getConsumeMode());
+        assertEquals(anno.consumeThreadMax(), container.getConsumeThreadMax());
+        assertEquals(anno.messageModel(), container.getMessageModel());
+        assertEquals(anno.selectorType(), container.getSelectorType());
+        assertEquals(anno.selectorExpression(), container.getSelectorExpression());
+        assertEquals(anno.tlsEnable(), container.getTlsEnable());
+        assertEquals(anno.namespace(), container.getNamespace());
+        assertEquals(anno.delayLevelWhenNextConsume(), container.getDelayLevelWhenNextConsume());
+        assertEquals(anno.suspendCurrentQueueTimeMillis(), container.getSuspendCurrentQueueTimeMillis());
+    }
+
+    @RocketMQMessageListener(consumerGroup = "abc1", topic = "test",
+            consumeMode = ConsumeMode.ORDERLY,
+            consumeThreadMax = 3456,
+            messageModel = MessageModel.BROADCASTING,
+            selectorType = SelectorType.SQL92,
+            selectorExpression = "selectorExpression",
+            tlsEnable = "tlsEnable",
+            namespace = "namespace",
+            delayLevelWhenNextConsume = 1234,
+            suspendCurrentQueueTimeMillis = 2345
+    )
+    class TestRocketMQMessageListener {
+    }
+
     class User {
         private String userName;
         private int userAge;