You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/30 08:35:43 UTC
[rocketmq-spring] branch master updated: n [ISSUE #450] fix More than one client is created when consumer.group and consumer.topic is used in application.properties
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 1aae0cd n [ISSUE #450] fix More than one client is created when consumer.group and consumer.topic is used in application.properties
1aae0cd is described below
commit 1aae0cd61f3d5d8cccddeed1ea7cf3d9b5f76dbe
Author: tiger lee <fr...@outlook.com>
AuthorDate: Mon May 30 16:35:38 2022 +0800
n [ISSUE #450] fix More than one client is created when consumer.group and consumer.topic is used in application.properties
Co-authored-by: tigerweili <ti...@tencent.com>
---
.../samples/springboot/ACLStringConsumer.java | 4 +-
.../springboot/ACLStringTransactionalConsumer.java | 4 +-
.../ExtRocketMQConsumerConfiguration.java | 10 +--
.../spring/annotation/RocketMQMessageListener.java | 6 +-
.../ExtConsumerResetConfiguration.java | 4 +-
.../ListenerContainerConfiguration.java | 2 +-
.../autoconfigure/RocketMQAutoConfiguration.java | 10 +--
.../spring/autoconfigure/RocketMQProperties.java | 76 +++++++++++++---------
.../RocketMQAutoConfigurationTest.java | 20 +++---
.../rocketmq/spring/core/RocketMQTemplateTest.java | 2 +-
10 files changed, 78 insertions(+), 60 deletions(-)
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
index d2d654f..e1353f0 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
@@ -28,8 +28,8 @@ import org.springframework.stereotype.Service;
@RocketMQMessageListener(
topic = "normal_topic_define_in_Aliware_MQ",
consumerGroup = "group_define_in_Aliware_MQ"
- //accessKey = "AK" // It will read by `rocketmq.consumer.access-key` key
- //secretKey = "SK" // It will read by `rocketmq.consumer.secret-key` key
+ //accessKey = "AK" // It will read by `rocketmq.push-consumer.access-key` key
+ //secretKey = "SK" // It will read by `rocketmq.push-consumer.secret-key` key
)
public class ACLStringConsumer implements RocketMQListener<String> {
@Override
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
index 342f44b..bd75895 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
@@ -28,8 +28,8 @@ import org.springframework.stereotype.Service;
@RocketMQMessageListener(
topic = "${demo.rocketmq.transTopic}",
consumerGroup = "group_define_in_Aliware_MQ",
- accessKey = "AK", // if accessKey is empty, it will read by `rocketmq.consumer.access-key` key
- secretKey = "SK" // if accessKey is empty, it will read by `rocketmq.consumer.secret-key` key
+ accessKey = "AK", // if accessKey is empty, it will read by `rocketmq.push-consumer.access-key` key
+ secretKey = "SK" // if accessKey is empty, it will read by `rocketmq.push-consumer.secret-key` key
)
public class ACLStringTransactionalConsumer implements RocketMQListener<String> {
@Override
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
index 0390fec..42172a4 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -32,12 +32,12 @@ import java.lang.annotation.Target;
public @interface ExtRocketMQConsumerConfiguration {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
- String GROUP_PLACEHOLDER = "${rocketmq.consumer.group:}";
- String TOPIC_PLACEHOLDER = "${rocketmq.consumer.topic:}";
+ String GROUP_PLACEHOLDER = "${rocketmq.push-consumer.group:}";
+ String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
- String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
- String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
- String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
+ String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.customized-trace-topic:}";
/**
* The component name of the Producer configuration.
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index f57bf3e..71ff7e2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -30,9 +30,9 @@ import java.util.concurrent.LinkedBlockingQueue;
public @interface RocketMQMessageListener {
String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
- String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
- String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
- String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+ String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
+ String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
+ String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.customized-trace-topic:}";
String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
/**
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
index b1e9288..fcb253d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -108,9 +108,9 @@ public class ExtConsumerResetConfiguration implements ApplicationContextAware, S
private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration annotation)
throws MQClientException {
- RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
+ RocketMQProperties.PullConsumer consumerConfig = rocketMQProperties.getPullConsumer();
if (consumerConfig == null) {
- consumerConfig = new RocketMQProperties.Consumer();
+ consumerConfig = new RocketMQProperties.PullConsumer();
}
String nameServer = resolvePlaceholders(annotation.nameServer(), rocketMQProperties.getNameServer());
String groupName = resolvePlaceholders(annotation.group(), consumerConfig.getGroup());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 5bd6413..e4afd14 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -96,7 +96,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
String topic = this.environment.resolvePlaceholders(annotation.topic());
boolean listenerEnabled =
- (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
+ (boolean) rocketMQProperties.getPushConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
.getOrDefault(topic, true);
if (!listenerEnabled) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index b908950..a765de9 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -117,21 +117,22 @@ public class RocketMQAutoConfiguration implements ApplicationContextAware {
producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
producer.setUseTLS(producerConfig.isTlsEnable());
producer.setNamespace(producerConfig.getNamespace());
+ log.info(String.format("a producer (%s) init on namesrv %s", groupName,nameServer));
return producer;
}
@Bean(CONSUMER_BEAN_NAME)
@ConditionalOnMissingBean(DefaultLitePullConsumer.class)
- @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
+ @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "pull-consumer.group", "pull-consumer.topic"})
public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
throws MQClientException {
- RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
+ RocketMQProperties.PullConsumer consumerConfig = rocketMQProperties.getPullConsumer();
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
- Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
- Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
+ Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
+ Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
@@ -147,6 +148,7 @@ public class RocketMQAutoConfiguration implements ApplicationContextAware {
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
+ log.info(String.format("a pull consumer(%s sub %s) init on namesrv %s", groupName, topicName,nameServer));
return litePullConsumer;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 974bec9..b9b98f2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -39,15 +39,22 @@ public class RocketMQProperties {
private Producer producer;
+ /**
+ * for pull consumer only
+ *
+ * @see org.apache.rocketmq.spring.annotation.RocketMQMessageListener for a push consumer
+ */
+ private PullConsumer pullConsumer = new PullConsumer();
/**
* Configure enable listener or not.
* In some particular cases, if you don't want the the listener is enabled when container startup,
* the configuration pattern is like this :
- * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
+ * rocketmq.push-consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
* <p>
* the listener is enabled by default.
+ *
*/
- private Consumer consumer = new Consumer();
+ private PushConsumer pushConsumer = new PushConsumer();
public String getNameServer() {
return nameServer;
@@ -73,6 +80,22 @@ public class RocketMQProperties {
this.producer = producer;
}
+ public PullConsumer getPullConsumer() {
+ return pullConsumer;
+ }
+
+ public void setPullConsumer(PullConsumer pullConsumer) {
+ this.pullConsumer = pullConsumer;
+ }
+
+ public PushConsumer getPushConsumer() {
+ return pushConsumer;
+ }
+
+ public void setPushConsumer(PushConsumer pushConsumer) {
+ this.pushConsumer = pushConsumer;
+ }
+
public static class Producer {
/**
@@ -247,15 +270,7 @@ public class RocketMQProperties {
}
}
- public Consumer getConsumer() {
- return consumer;
- }
-
- public void setConsumer(Consumer consumer) {
- this.consumer = consumer;
- }
-
- public static final class Consumer {
+ public static class PullConsumer {
/**
* Group name of consumer.
*/
@@ -274,11 +289,10 @@ public class RocketMQProperties {
/**
* Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
*/
- private String messageModel = "CLUSTERING";
+ private String messageModel = "CLUSTERING";
/**
* Control how to selector message.
- *
*/
private String selectorType = "TAG";
@@ -312,15 +326,6 @@ public class RocketMQProperties {
*/
private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
- /**
- * listener configuration container
- * the pattern is like this:
- * group1.topic1 = false
- * group2.topic2 = true
- * group3.topic3 = false
- */
- private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
-
/**
* The property of "tlsEnable".
*/
@@ -390,14 +395,6 @@ public class RocketMQProperties {
this.pullBatchSize = pullBatchSize;
}
- public Map<String, Map<String, Boolean>> getListeners() {
- return listeners;
- }
-
- public void setListeners(Map<String, Map<String, Boolean>> listeners) {
- this.listeners = listeners;
- }
-
public boolean isEnableMsgTrace() {
return enableMsgTrace;
}
@@ -431,4 +428,23 @@ public class RocketMQProperties {
}
}
+ public static class PushConsumer extends PullConsumer {
+ /**
+ * listener configuration container
+ * the pattern is like this:
+ * group1.topic1 = false
+ * group2.topic2 = true
+ * group3.topic3 = false
+ */
+ private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
+
+ public Map<String, Map<String, Boolean>> getListeners() {
+ return listeners;
+ }
+
+ public void setListeners(Map<String, Map<String, Boolean>> listeners) {
+ this.listeners = listeners;
+ }
+ }
+
}
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 03614f7..d448625 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -79,8 +79,8 @@ public class RocketMQAutoConfigurationTest {
@Test
public void testDefaultLitePullConsumerWithRelaxPropertyName() {
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
- "rocketmq.consumer.group=spring_rocketmq",
- "rocketmq.consumer.topic=test",
+ "rocketmq.pull-consumer.group=spring_rocketmq",
+ "rocketmq.pull-consumer.topic=test",
"rocketmq.accessChannel=LOCAL").
run((context) -> {
assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
@@ -100,8 +100,8 @@ public class RocketMQAutoConfigurationTest {
});
runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
- "rocketmq.consumer.group=spring_rocketmq",
- "rocketmq.consumer.topic=test",
+ "rocketmq.pull-consumer.group=spring_rocketmq",
+ "rocketmq.pull-consumer.topic=test",
"rocketmq.accessChannel=LOCAL123").
run((context) -> {
//Should throw exception for bad accessChannel property
@@ -121,8 +121,8 @@ public class RocketMQAutoConfigurationTest {
@Test
public void testDefaultLitePullConsumer() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
- "rocketmq.consumer.group=spring_rocketmq",
- "rocketmq.consumer.topic=test").
+ "rocketmq.pull-consumer.group=spring_rocketmq",
+ "rocketmq.pull-consumer.topic=test").
run((context) -> {
assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
});
@@ -153,12 +153,12 @@ public class RocketMQAutoConfigurationTest {
public void testConsumerListener() {
runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
"rocketmq.producer.group=spring_rocketmq",
- "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
- "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
+ "rocketmq.push-consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
+ "rocketmq.push-consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
run((context) -> {
RocketMQProperties rocketMQProperties = context.getBean(RocketMQProperties.class);
- assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
- assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
+ assertThat(rocketMQProperties.getPushConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
+ assertThat(rocketMQProperties.getPushConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
});
}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 4948c81..4d9e3cc 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -53,7 +53,7 @@ import static org.mockito.ArgumentMatchers.any;
"rocketmq.producer.secret-key=test-sk", "rocketmq.accessChannel=LOCAL",
"rocketmq.producer.sendMessageTimeout= 3500", "rocketmq.producer.retryTimesWhenSendFailed=3",
"rocketmq.producer.retryTimesWhenSendAsyncFailed=3",
- "rocketmq.consumer.group=spring_rocketmq", "rocketmq.consumer.topic=test"}, classes = {RocketMQAutoConfiguration.class, TransactionListenerImpl.class})
+ "rocketmq.pull-consumer.group=spring_rocketmq", "rocketmq.pull-consumer.topic=test"}, classes = {RocketMQAutoConfiguration.class, TransactionListenerImpl.class})
public class RocketMQTemplateTest {
@Resource