You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/06/19 08:46:33 UTC

[rocketmq-spring] branch master updated: [ISSUE #86] add feature: consumer listener enabled configuration (#87)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e34dd9  [ISSUE #86] add feature: consumer listener enabled configuration (#87)
3e34dd9 is described below

commit 3e34dd99dbf0a461186ccfa0d38f370744600995
Author: Luan Louis <lo...@qq.com>
AuthorDate: Wed Jun 19 16:46:28 2019 +0800

    [ISSUE #86] add feature: consumer listener enabled configuration (#87)
    
    #86
---
 .../ListenerContainerConfiguration.java            | 15 ++++++++
 .../spring/autoconfigure/RocketMQProperties.java   | 43 +++++++++++++++++++++-
 .../RocketMQAutoConfigurationTest.java             | 15 ++++++++
 3 files changed, 72 insertions(+), 1 deletion(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 2d6cb3f..5d461bc 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -38,6 +38,7 @@ import org.springframework.context.support.GenericApplicationContext;
 import org.springframework.core.env.StandardEnvironment;
 import org.springframework.util.StringUtils;
 
+import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicLong;
@@ -87,6 +88,20 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
         }
 
         RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
+
+        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
+        String topic = this.environment.resolvePlaceholders(annotation.topic());
+
+        boolean listenerEnabled =
+            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
+                .getOrDefault(topic, true);
+
+        if (!listenerEnabled) {
+            log.debug(
+                "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
+                consumerGroup, topic);
+            return;
+        }
         validate(annotation);
 
         String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 62ca2be..832d021 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -20,6 +20,9 @@ package org.apache.rocketmq.spring.autoconfigure;
 import org.apache.rocketmq.common.MixAll;
 import org.springframework.boot.context.properties.ConfigurationProperties;
 
+import java.util.HashMap;
+import java.util.Map;
+
 @SuppressWarnings("WeakerAccess")
 @ConfigurationProperties(prefix = "rocketmq")
 public class RocketMQProperties {
@@ -36,6 +39,16 @@ public class RocketMQProperties {
 
     private Producer producer;
 
+    /**
+     * Configure enable listener or not.
+     * In some particular cases, if you don't want the the listener is enabled when container startup,
+     * the configuration pattern is like this :
+     * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
+     * <p>
+     * the listener is enabled by default.
+     */
+    private Consumer consumer = new Consumer();
+
     public String getNameServer() {
         return nameServer;
     }
@@ -63,7 +76,7 @@ public class RocketMQProperties {
     public static class Producer {
 
         /**
-         * Name of producer.
+         * Group name of producer.
          */
         private String group;
 
@@ -207,4 +220,32 @@ public class RocketMQProperties {
             this.customizedTraceTopic = customizedTraceTopic;
         }
     }
+
+    public Consumer getConsumer() {
+        return consumer;
+    }
+
+    public void setConsumer(Consumer consumer) {
+        this.consumer = consumer;
+    }
+
+    public static final class Consumer {
+        /**
+         * listener configuration container
+         * the pattern is like this:
+         * group1.topic1 = false
+         * group2.topic2 = true
+         * group3.topic3 = false
+         */
+        private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
+
+        public Map<String, Map<String, Boolean>> getListeners() {
+            return listeners;
+        }
+
+        public void setListeners(Map<String, Map<String, Boolean>> listeners) {
+            this.listeners = listeners;
+        }
+    }
+
 }
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 528e916..d613808 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -145,6 +145,21 @@ public class RocketMQAutoConfigurationTest {
                 });
     }
 
+    @Test
+    public void testConsumerListener() {
+        runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
+            "rocketmq.producer.group=spring_rocketmq",
+            "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
+            "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
+            run((context) -> {
+                RocketMQProperties rocketMQProperties = context.getBean(RocketMQProperties.class);
+                assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
+                assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
+            });
+
+    }
+
+
     @Configuration
     static class TestConfig {