You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/02/09 03:54:38 UTC
[rocketmq-spring] branch master updated: [ISSUE #429]Use 'consumeThreadNumber' instead of 'consumeThreadMax' (#431)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new e1ab73f [ISSUE #429]Use 'consumeThreadNumber' instead of 'consumeThreadMax' (#431)
e1ab73f is described below
commit e1ab73f767b5ea9643c870286d284e2e7315696f
Author: joewee <jo...@users.noreply.github.com>
AuthorDate: Wed Feb 9 11:54:32 2022 +0800
[ISSUE #429]Use 'consumeThreadNumber' instead of 'consumeThreadMax' (#431)
Co-authored-by: joewee <37...@qq.com>
---
.../spring/annotation/RocketMQMessageListener.java | 11 +++++++++++
.../spring/support/DefaultRocketMQListenerContainer.java | 16 +++++++++++-----
.../support/DefaultRocketMQListenerContainerTest.java | 5 +++--
3 files changed, 25 insertions(+), 7 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index ca8c857..fb68ed5 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -22,6 +22,7 @@ import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
+import java.util.concurrent.LinkedBlockingQueue;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@@ -72,10 +73,20 @@ public @interface RocketMQMessageListener {
/**
* Max consumer thread number.
+ * @deprecated This property is not work well, because the consumer thread pool executor use
+ * {@link LinkedBlockingQueue} with default capacity bound (Integer.MAX_VALUE), use
+ * {@link RocketMQMessageListener#consumeThreadNumber} .
+ * @see <a href="https://github.com/apache/rocketmq-spring/issues/429">issues#429</a>
*/
+ @Deprecated
int consumeThreadMax() default 64;
/**
+ * consumer thread number.
+ */
+ int consumeThreadNumber() default 20;
+
+ /**
* Max re-consume times.
*
* In concurrently mode, -1 means 16;
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
index 6f5f5d0..a73e949 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
@@ -105,6 +105,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
private int consumeThreadMax = 64;
+ private int consumeThreadNumber = 20;
+
private String charset = "UTF-8";
private MessageConverter messageConverter;
@@ -186,6 +188,10 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
return consumeThreadMax;
}
+ public int getConsumeThreadNumber() {
+ return consumeThreadNumber;
+ }
+
public String getCharset() {
return charset;
}
@@ -227,7 +233,8 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
this.rocketMQMessageListener = anno;
this.consumeMode = anno.consumeMode();
- this.consumeThreadMax = anno.consumeThreadMax();
+ this.consumeThreadMax = anno.consumeThreadNumber();
+ this.consumeThreadNumber = anno.consumeThreadNumber();
this.messageModel = anno.messageModel();
this.selectorType = anno.selectorType();
this.selectorExpression = anno.selectorExpression();
@@ -612,10 +619,9 @@ public class DefaultRocketMQListenerContainer implements InitializingBean,
if (accessChannel != null) {
consumer.setAccessChannel(accessChannel);
}
- consumer.setConsumeThreadMax(consumeThreadMax);
- if (consumeThreadMax < consumer.getConsumeThreadMin()) {
- consumer.setConsumeThreadMin(consumeThreadMax);
- }
+ //set the consumer core thread number and maximum thread number has the same value
+ consumer.setConsumeThreadMax(consumeThreadNumber);
+ consumer.setConsumeThreadMin(consumeThreadNumber);
consumer.setConsumeTimeout(consumeTimeout);
consumer.setMaxReconsumeTimes(maxReconsumeTimes);
switch (messageModel) {
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
index 76560fc..3754db8 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java
@@ -244,7 +244,8 @@ public class DefaultRocketMQListenerContainerTest {
container.setRocketMQMessageListener(anno);
assertEquals(anno.consumeMode(), container.getConsumeMode());
- assertEquals(anno.consumeThreadMax(), container.getConsumeThreadMax());
+ assertEquals(anno.consumeThreadNumber(), container.getConsumeThreadMax());
+ assertEquals(anno.consumeThreadNumber(), container.getConsumeThreadNumber());
assertEquals(anno.messageModel(), container.getMessageModel());
assertEquals(anno.selectorType(), container.getSelectorType());
assertEquals(anno.selectorExpression(), container.getSelectorExpression());
@@ -256,7 +257,7 @@ public class DefaultRocketMQListenerContainerTest {
@RocketMQMessageListener(consumerGroup = "abc1", topic = "test",
consumeMode = ConsumeMode.ORDERLY,
- consumeThreadMax = 3456,
+ consumeThreadNumber = 3456,
messageModel = MessageModel.BROADCASTING,
selectorType = SelectorType.SQL92,
selectorExpression = "selectorExpression",