You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/05 02:32:48 UTC

[rocketmq-spring] branch master updated: [ISSUE #435] Support Consumer Shutdown "awaitTerminationMillisWhenShutdown" (#435)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 0466bef  [ISSUE #435] Support Consumer Shutdown "awaitTerminationMillisWhenShutdown" (#435)
0466bef is described below

commit 0466befd24e42bcb9d639259e3404da64825f591
Author: llq <36...@users.noreply.github.com>
AuthorDate: Sat Mar 5 10:32:44 2022 +0800

    [ISSUE #435] Support Consumer Shutdown "awaitTerminationMillisWhenShutdown" (#435)
    
    * feature:Support Consumer Shutdown "awaitTerminationMillisWhenShutdown"
    
    Co-authored-by: liuluqi <li...@kanzhun.com>
---
 .../rocketmq/spring/annotation/RocketMQMessageListener.java  |  6 ++++++
 .../spring/support/DefaultRocketMQListenerContainer.java     | 12 ++++++++++++
 2 files changed, 18 insertions(+)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index fb68ed5..f57bf3e 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -159,4 +159,10 @@ public @interface RocketMQMessageListener {
      * The minimum value is 10 and the maximum is 30000.
      */
     int suspendCurrentQueueTimeMillis() default 1000;
+
+    /**
+     * Maximum time to await message consuming when shutdown consumer, in milliseconds.
+     * The minimum value is 0
+     */
+    int awaitTerminationMillisWhenShutdown() default 1000;
 }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index a73e949..182f08c 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -135,6 +135,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
     private int replyTimeout;
     private String tlsEnable;
     private String namespace;
+    private long awaitTerminationMillisWhenShutdown;
 
     public long getSuspendCurrentQueueTimeMillis() {
         return suspendCurrentQueueTimeMillis;
@@ -245,6 +246,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         this.namespace = anno.namespace();
         this.delayLevelWhenNextConsume = anno.delayLevelWhenNextConsume();
         this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis();
+        this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown());
     }
 
     public ConsumeMode getConsumeMode() {
@@ -291,6 +293,15 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         this.consumer = consumer;
     }
 
+    public long getAwaitTerminationMillisWhenShutdown() {
+        return awaitTerminationMillisWhenShutdown;
+    }
+
+    public DefaultRocketMQListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) {
+        this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown;
+        return this;
+    }
+
     @Override
     public void destroy() {
         this.setRunning(false);
@@ -624,6 +635,7 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         consumer.setConsumeThreadMin(consumeThreadNumber);
         consumer.setConsumeTimeout(consumeTimeout);
         consumer.setMaxReconsumeTimes(maxReconsumeTimes);
+        consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown);
         switch (messageModel) {
             case BROADCASTING:
                 consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);