You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/06/19 08:46:33 UTC
[rocketmq-spring] branch master updated: [ISSUE #86] add feature:
consumer listener enabled configuration (#87)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 3e34dd9 [ISSUE #86] add feature: consumer listener enabled configuration (#87)
3e34dd9 is described below
commit 3e34dd99dbf0a461186ccfa0d38f370744600995
Author: Luan Louis <lo...@qq.com>
AuthorDate: Wed Jun 19 16:46:28 2019 +0800
[ISSUE #86] add feature: consumer listener enabled configuration (#87)
#86
---
.../ListenerContainerConfiguration.java | 15 ++++++++
.../spring/autoconfigure/RocketMQProperties.java | 43 +++++++++++++++++++++-
.../RocketMQAutoConfigurationTest.java | 15 ++++++++
3 files changed, 72 insertions(+), 1 deletion(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 2d6cb3f..5d461bc 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -38,6 +38,7 @@ import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;
+import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
@@ -87,6 +88,20 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
}
RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
+
+ String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
+ String topic = this.environment.resolvePlaceholders(annotation.topic());
+
+ boolean listenerEnabled =
+ (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
+ .getOrDefault(topic, true);
+
+ if (!listenerEnabled) {
+ log.debug(
+ "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
+ consumerGroup, topic);
+ return;
+ }
validate(annotation);
String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 62ca2be..832d021 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -20,6 +20,9 @@ package org.apache.rocketmq.spring.autoconfigure;
import org.apache.rocketmq.common.MixAll;
import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.util.HashMap;
+import java.util.Map;
+
@SuppressWarnings("WeakerAccess")
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
@@ -36,6 +39,16 @@ public class RocketMQProperties {
private Producer producer;
+ /**
+ * Configure enable listener or not.
+ * In some particular cases, if you don't want the the listener is enabled when container startup,
+ * the configuration pattern is like this :
+ * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
+ * <p>
+ * the listener is enabled by default.
+ */
+ private Consumer consumer = new Consumer();
+
public String getNameServer() {
return nameServer;
}
@@ -63,7 +76,7 @@ public class RocketMQProperties {
public static class Producer {
/**
- * Name of producer.
+ * Group name of producer.
*/
private String group;
@@ -207,4 +220,32 @@ public class RocketMQProperties {
this.customizedTraceTopic = customizedTraceTopic;
}
}
+
+ public Consumer getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(Consumer consumer) {
+ this.consumer = consumer;
+ }
+
+ public static final class Consumer {
+ /**
+ * listener configuration container
+ * the pattern is like this:
+ * group1.topic1 = false
+ * group2.topic2 = true
+ * group3.topic3 = false
+ */
+ private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
+
+ public Map<String, Map<String, Boolean>> getListeners() {
+ return listeners;
+ }
+
+ public void setListeners(Map<String, Map<String, Boolean>> listeners) {
+ this.listeners = listeners;
+ }
+ }
+
}
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 528e916..d613808 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -145,6 +145,21 @@ public class RocketMQAutoConfigurationTest {
});
}
+ @Test
+ public void testConsumerListener() {
+ runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
+ "rocketmq.producer.group=spring_rocketmq",
+ "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
+ "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
+ run((context) -> {
+ RocketMQProperties rocketMQProperties = context.getBean(RocketMQProperties.class);
+ assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
+ assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
+ });
+
+ }
+
+
@Configuration
static class TestConfig {