You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/05 02:32:48 UTC
[rocketmq-spring] branch master updated: [ISSUE #435] Support Consumer Shutdown "awaitTerminationMillisWhenShutdown" (#435)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 0466bef [ISSUE #435] Support Consumer Shutdown "awaitTerminationMillisWhenShutdown" (#435)
0466bef is described below
commit 0466befd24e42bcb9d639259e3404da64825f591
Author: llq <36...@users.noreply.github.com>
AuthorDate: Sat Mar 5 10:32:44 2022 +0800
[ISSUE #435] Support Consumer Shutdown "awaitTerminationMillisWhenShutdown" (#435)
* feature:Support Consumer Shutdown "awaitTerminationMillisWhenShutdown"
Co-authored-by: liuluqi <li...@kanzhun.com>
---
.../rocketmq/spring/annotation/RocketMQMessageListener.java | 6 ++++++
.../spring/support/DefaultRocketMQListenerContainer.java | 12 ++++++++++++
2 files changed, 18 insertions(+)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index fb68ed5..f57bf3e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -159,4 +159,10 @@ public @interface RocketMQMessageListener {
* The minimum value is 10 and the maximum is 30000.
*/
int suspendCurrentQueueTimeMillis() default 1000;
+
+ /**
+ * Maximum time to await message consuming when shutdown consumer, in milliseconds.
+ * The minimum value is 0
+ */
+ int awaitTerminationMillisWhenShutdown() default 1000;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index a73e949..182f08c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -135,6 +135,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
private int replyTimeout;
private String tlsEnable;
private String namespace;
+ private long awaitTerminationMillisWhenShutdown;
public long getSuspendCurrentQueueTimeMillis() {
return suspendCurrentQueueTimeMillis;
@@ -245,6 +246,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
this.namespace = anno.namespace();
this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
+ this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
}
public ConsumeMode getConsumeMode() {
@@ -291,6 +293,15 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
this.consumer = consumer;
}
+ public long getAwaitTerminationMillisWhenShutdown() {
+ return awaitTerminationMillisWhenShutdown;
+ }
+
+ public DefaultRocketMQListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
+ this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
+ return this;
+ }
+
@Override
public void destroy() {
this.setRunning(false);
@@ -624,6 +635,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
consumer.setConsumeThreadMin(consumeThreadNumber);
consumer.setConsumeTimeout(consumeTimeout);
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
+ consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
switch (messageModel) {
case BROADCASTING:
consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);