You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/07 09:22:09 UTC
[rocketmq-spring] branch master updated: [ISSUE #419]Support consumption retry strategy configuration.
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 45833f6 [ISSUE #419]Support consumption retry strategy configuration.
45833f6 is described below
commit 45833f6c5d7bd271ae44b5dda3b7a49f42458e2c
Author: CharliePu <he...@163.com>
AuthorDate: Mon Feb 7 17:22:02 2022 +0800
[ISSUE #419]Support consumption retry strategy configuration.
---
.../spring/annotation/RocketMQMessageListener.java | 16 ++++++++++
.../support/DefaultRocketMQListenerContainer.java | 16 ++++++++--
.../DefaultRocketMQListenerContainerTest.java | 36 ++++++++++++++++++++++
3 files changed, 65 insertions(+), 3 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index ecc3e0e..ca8c857 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -132,4 +132,20 @@ public @interface RocketMQMessageListener {
* The namespace of consumer.
*/
String namespace() default "";
+
+ /**
+ * Message consume retry strategy in concurrently mode.
+ *
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency
+ */
+ int delayLevelWhenNextConsume() default 0;
+
+ /**
+ * The interval of suspending the pull in orderly mode, in milliseconds.
+ *
+ * The minimum value is 10 and the maximum is 30000.
+ */
+ int suspendCurrentQueueTimeMillis() default 1000;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index f48788e..6f5f5d0 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -79,11 +79,19 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
*/
private String name;
+ /**
+ * Suspending pulling time in orderly mode.
+ *
+ * The minimum value is 10 and the maximum is 30000.
+ */
private long suspendCurrentQueueTimeMillis = 1000;
/**
- * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br>
- * >0,client control retry frequency.
+ * Message consume retry strategy in concurrently mode.
+ *
+ * -1,no retry,put into DLQ directly
+ * 0,broker control retry frequency
+ * >0,client control retry frequency
*/
private int delayLevelWhenNextConsume = 0;
@@ -228,6 +236,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
this.replyTimeout = anno.replyTimeout();
this.tlsEnable = anno.tlsEnable();
this.namespace = anno.namespace();
+ this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
+ this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
}
public ConsumeMode getConsumeMode() {
@@ -652,4 +662,4 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
}
-}
\ No newline at end of file
+}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 1304b9f..76560fc 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -29,6 +29,10 @@ import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.ConsumeMode;
+import org.apache.rocketmq.spring.annotation.MessageModel;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.junit.Test;
@@ -42,6 +46,7 @@ import java.util.Arrays;
import java.util.Date;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
@@ -232,6 +237,37 @@ public class DefaultRocketMQListenerContainerTest {
handleMessage.invoke(listenerContainer, messageExt);
}
+ @Test
+ public void testSetRocketMQMessageListener() {
+ DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
+ RocketMQMessageListener anno = TestRocketMQMessageListener.class.getAnnotation(RocketMQMessageListener.class);
+ container.setRocketMQMessageListener(anno);
+
+ assertEquals(anno.consumeMode(), container.getConsumeMode());
+ assertEquals(anno.consumeThreadMax(), container.getConsumeThreadMax());
+ assertEquals(anno.messageModel(), container.getMessageModel());
+ assertEquals(anno.selectorType(), container.getSelectorType());
+ assertEquals(anno.selectorExpression(), container.getSelectorExpression());
+ assertEquals(anno.tlsEnable(), container.getTlsEnable());
+ assertEquals(anno.namespace(), container.getNamespace());
+ assertEquals(anno.delayLevelWhenNextConsume(), container.getDelayLevelWhenNextConsume());
+ assertEquals(anno.suspendCurrentQueueTimeMillis(), container.getSuspendCurrentQueueTimeMillis());
+ }
+
+ @RocketMQMessageListener(consumerGroup = "abc1", topic = "test",
+ consumeMode = ConsumeMode.ORDERLY,
+ consumeThreadMax = 3456,
+ messageModel = MessageModel.BROADCASTING,
+ selectorType = SelectorType.SQL92,
+ selectorExpression = "selectorExpression",
+ tlsEnable = "tlsEnable",
+ namespace = "namespace",
+ delayLevelWhenNextConsume = 1234,
+ suspendCurrentQueueTimeMillis = 2345
+ )
+ class TestRocketMQMessageListener {
+ }
+
class User {
private String userName;
private int userAge;