You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/09 03:54:38 UTC

[rocketmq-spring] branch master updated: [ISSUE #429]Use 'consumeThreadNumber' instead of 'consumeThreadMax' (#431)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new e1ab73f  [ISSUE #429]Use 'consumeThreadNumber' instead of 'consumeThreadMax' (#431)
e1ab73f is described below

commit e1ab73f767b5ea9643c870286d284e2e7315696f
Author: joewee <jo...@users.noreply.github.com>
AuthorDate: Wed Feb 9 11:54:32 2022 +0800

    [ISSUE #429]Use 'consumeThreadNumber' instead of 'consumeThreadMax' (#431)
    
    Co-authored-by: joewee <37...@qq.com>
---
 .../spring/annotation/RocketMQMessageListener.java       | 11 +++++++++++
 .../spring/support/DefaultRocketMQListenerContainer.java | 16 +++++++++++-----
 .../support/DefaultRocketMQListenerContainerTest.java    |  5 +++--
 3 files changed, 25 insertions(+), 7 deletions(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index ca8c857..fb68ed5 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -22,6 +22,7 @@ import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
+import java.util.concurrent.LinkedBlockingQueue;
 
 @Target(ElementType.TYPE)
 @Retention(RetentionPolicy.RUNTIME)
@@ -72,10 +73,20 @@ public @interface RocketMQMessageListener {
 
     /**
      * Max consumer thread number.
+     * @deprecated This property is not work well, because the consumer thread pool executor use
+     * {@link LinkedBlockingQueue} with default capacity bound (Integer.MAX_VALUE), use
+     * {@link RocketMQMessageListener#consumeThreadNumber} .
+     * @see <a href="https://github.com/apache/rocketmq-spring/issues/429">issues#429</a>
      */
+    @Deprecated
     int consumeThreadMax() default 64;
 
     /**
+     * consumer thread number.
+     */
+    int consumeThreadNumber() default 20;
+
+    /**
      * Max re-consume times.
      *
      * In concurrently mode, -1 means 16;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 6f5f5d0..a73e949 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -105,6 +105,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
 
     private int consumeThreadMax = 64;
 
+    private int consumeThreadNumber = 20;
+
     private String charset = "UTF-8";
 
     private MessageConverter messageConverter;
@@ -186,6 +188,10 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         return consumeThreadMax;
     }
 
+    public int getConsumeThreadNumber() {
+        return consumeThreadNumber;
+    }
+
     public String getCharset() {
         return charset;
     }
@@ -227,7 +233,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         this.rocketMQMessageListener = anno;
 
         this.consumeMode = anno.consumeMode();
-        this.consumeThreadMax = anno.consumeThreadMax();
+        this.consumeThreadMax = anno.consumeThreadNumber();
+        this.consumeThreadNumber = anno.consumeThreadNumber();
         this.messageModel = anno.messageModel();
         this.selectorType = anno.selectorType();
         this.selectorExpression = anno.selectorExpression();
@@ -612,10 +619,9 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
         if (accessChannel != null) {
             consumer.setAccessChannel(accessChannel);
         }
-        consumer.setConsumeThreadMax(consumeThreadMax);
-        if (consumeThreadMax < consumer.getConsumeThreadMin()) {
-            consumer.setConsumeThreadMin(consumeThreadMax);
-        }
+        //set the consumer core thread number and maximum thread number has the same value
+        consumer.setConsumeThreadMax(consumeThreadNumber);
+        consumer.setConsumeThreadMin(consumeThreadNumber);
         consumer.setConsumeTimeout(consumeTimeout);
         consumer.setMaxReconsumeTimes(maxReconsumeTimes);
         switch (messageModel) {
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 76560fc..3754db8 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -244,7 +244,8 @@ public class DefaultRocketMQListenerContainerTest {
         container.setRocketMQMessageListener(anno);
 
         assertEquals(anno.consumeMode(), container.getConsumeMode());
-        assertEquals(anno.consumeThreadMax(), container.getConsumeThreadMax());
+        assertEquals(anno.consumeThreadNumber(), container.getConsumeThreadMax());
+        assertEquals(anno.consumeThreadNumber(), container.getConsumeThreadNumber());
         assertEquals(anno.messageModel(), container.getMessageModel());
         assertEquals(anno.selectorType(), container.getSelectorType());
         assertEquals(anno.selectorExpression(), container.getSelectorExpression());
@@ -256,7 +257,7 @@ public class DefaultRocketMQListenerContainerTest {
 
     @RocketMQMessageListener(consumerGroup = "abc1", topic = "test",
             consumeMode = ConsumeMode.ORDERLY,
-            consumeThreadMax = 3456,
+            consumeThreadNumber = 3456,
             messageModel = MessageModel.BROADCASTING,
             selectorType = SelectorType.SQL92,
             selectorExpression = "selectorExpression",