You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/05/30 08:35:43 UTC

[rocketmq-spring] branch master updated: n [ISSUE #450] fix More than one client is created when consumer.group and consumer.topic is used in application.properties

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 1aae0cd  n [ISSUE #450] fix More than one client is created when consumer.group and consumer.topic is used in application.properties
1aae0cd is described below

commit 1aae0cd61f3d5d8cccddeed1ea7cf3d9b5f76dbe
Author: tiger lee <fr...@outlook.com>
AuthorDate: Mon May 30 16:35:38 2022 +0800

    n [ISSUE #450] fix More than one client is created when consumer.group and consumer.topic is used in application.properties
    
    Co-authored-by: tigerweili <ti...@tencent.com>
---
 .../samples/springboot/ACLStringConsumer.java      |  4 +-
 .../springboot/ACLStringTransactionalConsumer.java |  4 +-
 .../ExtRocketMQConsumerConfiguration.java          | 10 +--
 .../spring/annotation/RocketMQMessageListener.java |  6 +-
 .../ExtConsumerResetConfiguration.java             |  4 +-
 .../ListenerContainerConfiguration.java            |  2 +-
 .../autoconfigure/RocketMQAutoConfiguration.java   | 10 +--
 .../spring/autoconfigure/RocketMQProperties.java   | 76 +++++++++++++---------
 .../RocketMQAutoConfigurationTest.java             | 20 +++---
 .../rocketmq/spring/core/RocketMQTemplateTest.java |  2 +-
 10 files changed, 78 insertions(+), 60 deletions(-)

diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
index d2d654f..e1353f0 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringConsumer.java
@@ -28,8 +28,8 @@ import org.springframework.stereotype.Service;
 @RocketMQMessageListener(
     topic = "normal_topic_define_in_Aliware_MQ",
     consumerGroup = "group_define_in_Aliware_MQ"
-    //accessKey = "AK" // It will read by `rocketmq.consumer.access-key` key
-    //secretKey = "SK" // It will read by `rocketmq.consumer.secret-key` key
+    //accessKey = "AK" // It will read by `rocketmq.push-consumer.access-key` key
+    //secretKey = "SK" // It will read by `rocketmq.push-consumer.secret-key` key
     )
 public class ACLStringConsumer implements RocketMQListener<String> {
     @Override
diff --git a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
index 342f44b..bd75895 100644
--- a/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
+++ b/rocketmq-spring-boot-samples/rocketmq-consume-acl-demo/src/main/java/org/apache/rocketmq/samples/springboot/ACLStringTransactionalConsumer.java
@@ -28,8 +28,8 @@ import org.springframework.stereotype.Service;
 @RocketMQMessageListener(
     topic = "${demo.rocketmq.transTopic}",
     consumerGroup = "group_define_in_Aliware_MQ",
-    accessKey = "AK", // if accessKey is empty, it will read by `rocketmq.consumer.access-key` key
-    secretKey = "SK"  // if accessKey is empty, it will read by `rocketmq.consumer.secret-key` key
+    accessKey = "AK", // if accessKey is empty, it will read by `rocketmq.push-consumer.access-key` key
+    secretKey = "SK"  // if accessKey is empty, it will read by `rocketmq.push-consumer.secret-key` key
     )
 public class ACLStringTransactionalConsumer implements RocketMQListener<String> {
     @Override
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
index 0390fec..42172a4 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/ExtRocketMQConsumerConfiguration.java
@@ -32,12 +32,12 @@ import java.lang.annotation.Target;
 public @interface ExtRocketMQConsumerConfiguration {
 
     String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
-    String GROUP_PLACEHOLDER = "${rocketmq.consumer.group:}";
-    String TOPIC_PLACEHOLDER = "${rocketmq.consumer.topic:}";
+    String GROUP_PLACEHOLDER = "${rocketmq.push-consumer.group:}";
+    String TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.topic:}";
     String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
-    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
-    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
-    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
+    String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
+    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.customized-trace-topic:}";
 
     /**
      * The component name of the Producer configuration.
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
index f57bf3e..71ff7e2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java
@@ -30,9 +30,9 @@ import java.util.concurrent.LinkedBlockingQueue;
 public @interface RocketMQMessageListener {
 
     String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
-    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
-    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
-    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
+    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.push-consumer.access-key:}";
+    String SECRET_KEY_PLACEHOLDER = "${rocketmq.push-consumer.secret-key:}";
+    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.push-consumer.customized-trace-topic:}";
     String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
 
     /**
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
index b1e9288..fcb253d 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtConsumerResetConfiguration.java
@@ -108,9 +108,9 @@ public class ExtConsumerResetConfiguration implements ApplicationContextAware, S
     private DefaultLitePullConsumer createConsumer(ExtRocketMQConsumerConfiguration annotation)
             throws MQClientException {
 
-        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
+        RocketMQProperties.PullConsumer consumerConfig = rocketMQProperties.getPullConsumer();
         if (consumerConfig == null) {
-            consumerConfig = new RocketMQProperties.Consumer();
+            consumerConfig = new RocketMQProperties.PullConsumer();
         }
         String nameServer = resolvePlaceholders(annotation.nameServer(), rocketMQProperties.getNameServer());
         String groupName = resolvePlaceholders(annotation.group(), consumerConfig.getGroup());
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 5bd6413..e4afd14 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -96,7 +96,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
         String topic = this.environment.resolvePlaceholders(annotation.topic());
 
         boolean listenerEnabled =
-            (boolean) rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
+            (boolean) rocketMQProperties.getPushConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                 .getOrDefault(topic, true);
 
         if (!listenerEnabled) {
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
index b908950..a765de9 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfiguration.java
@@ -117,21 +117,22 @@ public class RocketMQAutoConfiguration implements ApplicationContextAware {
         producer.setRetryAnotherBrokerWhenNotStoreOK(producerConfig.isRetryNextServer());
         producer.setUseTLS(producerConfig.isTlsEnable());
         producer.setNamespace(producerConfig.getNamespace());
+        log.info(String.format("a producer (%s) init on namesrv %s",  groupName,nameServer));
         return producer;
     }
 
     @Bean(CONSUMER_BEAN_NAME)
     @ConditionalOnMissingBean(DefaultLitePullConsumer.class)
-    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "consumer.group", "consumer.topic"})
+    @ConditionalOnProperty(prefix = "rocketmq", value = {"name-server", "pull-consumer.group", "pull-consumer.topic"})
     public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)
             throws MQClientException {
-        RocketMQProperties.Consumer consumerConfig = rocketMQProperties.getConsumer();
+        RocketMQProperties.PullConsumer consumerConfig = rocketMQProperties.getPullConsumer();
         String nameServer = rocketMQProperties.getNameServer();
         String groupName = consumerConfig.getGroup();
         String topicName = consumerConfig.getTopic();
         Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
-        Assert.hasText(groupName, "[rocketmq.consumer.group] must not be null");
-        Assert.hasText(topicName, "[rocketmq.consumer.topic] must not be null");
+        Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be null");
+        Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be null");
 
         String accessChannel = rocketMQProperties.getAccessChannel();
         MessageModel messageModel = MessageModel.valueOf(consumerConfig.getMessageModel());
@@ -147,6 +148,7 @@ public class RocketMQAutoConfiguration implements ApplicationContextAware {
         litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
         litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
         litePullConsumer.setNamespace(consumerConfig.getNamespace());
+        log.info(String.format("a pull consumer(%s sub %s) init on namesrv %s",  groupName, topicName,nameServer));
         return litePullConsumer;
     }
 
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
index 974bec9..b9b98f2 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQProperties.java
@@ -39,15 +39,22 @@ public class RocketMQProperties {
 
     private Producer producer;
 
+    /**
+     * for pull consumer only
+     *
+     * @see org.apache.rocketmq.spring.annotation.RocketMQMessageListener for a push consumer
+     */
+    private PullConsumer pullConsumer = new PullConsumer();
     /**
      * Configure enable listener or not.
      * In some particular cases, if you don't want the the listener is enabled when container startup,
      * the configuration pattern is like this :
-     * rocketmq.consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
+     * rocketmq.push-consumer.listeners.<group-name>.<topic-name>.enabled=<boolean value, true or false>
      * <p>
      * the listener is enabled by default.
+     *
      */
-    private Consumer consumer = new Consumer();
+    private PushConsumer pushConsumer = new PushConsumer();
 
     public String getNameServer() {
         return nameServer;
@@ -73,6 +80,22 @@ public class RocketMQProperties {
         this.producer = producer;
     }
 
+    public PullConsumer getPullConsumer() {
+        return pullConsumer;
+    }
+
+    public void setPullConsumer(PullConsumer pullConsumer) {
+        this.pullConsumer = pullConsumer;
+    }
+
+    public PushConsumer getPushConsumer() {
+        return pushConsumer;
+    }
+
+    public void setPushConsumer(PushConsumer pushConsumer) {
+        this.pushConsumer = pushConsumer;
+    }
+
     public static class Producer {
 
         /**
@@ -247,15 +270,7 @@ public class RocketMQProperties {
         }
     }
 
-    public Consumer getConsumer() {
-        return consumer;
-    }
-
-    public void setConsumer(Consumer consumer) {
-        this.consumer = consumer;
-    }
-
-    public static final class Consumer {
+    public static class PullConsumer {
         /**
          * Group name of consumer.
          */
@@ -274,11 +289,10 @@ public class RocketMQProperties {
         /**
          * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
          */
-        private String  messageModel = "CLUSTERING";
+        private String messageModel = "CLUSTERING";
 
         /**
          * Control how to selector message.
-         *
          */
         private String selectorType = "TAG";
 
@@ -312,15 +326,6 @@ public class RocketMQProperties {
          */
         private String customizedTraceTopic = TopicValidator.RMQ_SYS_TRACE_TOPIC;
 
-        /**
-         * listener configuration container
-         * the pattern is like this:
-         * group1.topic1 = false
-         * group2.topic2 = true
-         * group3.topic3 = false
-         */
-        private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
-
         /**
          * The property of "tlsEnable".
          */
@@ -390,14 +395,6 @@ public class RocketMQProperties {
             this.pullBatchSize = pullBatchSize;
         }
 
-        public Map<String, Map<String, Boolean>> getListeners() {
-            return listeners;
-        }
-
-        public void setListeners(Map<String, Map<String, Boolean>> listeners) {
-            this.listeners = listeners;
-        }
-
         public boolean isEnableMsgTrace() {
             return enableMsgTrace;
         }
@@ -431,4 +428,23 @@ public class RocketMQProperties {
         }
     }
 
+    public static class PushConsumer extends PullConsumer {
+        /**
+         * listener configuration container
+         * the pattern is like this:
+         * group1.topic1 = false
+         * group2.topic2 = true
+         * group3.topic3 = false
+         */
+        private Map<String, Map<String, Boolean>> listeners = new HashMap<>();
+
+        public Map<String, Map<String, Boolean>> getListeners() {
+            return listeners;
+        }
+
+        public void setListeners(Map<String, Map<String, Boolean>> listeners) {
+            this.listeners = listeners;
+        }
+    }
+
 }
\ No newline at end of file
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
index 03614f7..d448625 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/autoconfigure/RocketMQAutoConfigurationTest.java
@@ -79,8 +79,8 @@ public class RocketMQAutoConfigurationTest {
     @Test
     public void testDefaultLitePullConsumerWithRelaxPropertyName() {
         runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
-                "rocketmq.consumer.group=spring_rocketmq",
-                "rocketmq.consumer.topic=test",
+                "rocketmq.pull-consumer.group=spring_rocketmq",
+                "rocketmq.pull-consumer.topic=test",
                 "rocketmq.accessChannel=LOCAL").
                 run((context) -> {
                     assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
@@ -100,8 +100,8 @@ public class RocketMQAutoConfigurationTest {
             });
 
         runner.withPropertyValues("rocketmq.nameServer=127.0.0.1:9876",
-                "rocketmq.consumer.group=spring_rocketmq",
-                "rocketmq.consumer.topic=test",
+                "rocketmq.pull-consumer.group=spring_rocketmq",
+                "rocketmq.pull-consumer.topic=test",
                 "rocketmq.accessChannel=LOCAL123").
                 run((context) -> {
                     //Should throw exception for bad accessChannel property
@@ -121,8 +121,8 @@ public class RocketMQAutoConfigurationTest {
     @Test
     public void testDefaultLitePullConsumer() {
         runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
-                "rocketmq.consumer.group=spring_rocketmq",
-                "rocketmq.consumer.topic=test").
+                "rocketmq.pull-consumer.group=spring_rocketmq",
+                "rocketmq.pull-consumer.topic=test").
                 run((context) -> {
                     assertThat(context).hasSingleBean(DefaultLitePullConsumer.class);
                 });
@@ -153,12 +153,12 @@ public class RocketMQAutoConfigurationTest {
     public void testConsumerListener() {
         runner.withPropertyValues("rocketmq.name-server=127.0.0.1:9876",
             "rocketmq.producer.group=spring_rocketmq",
-            "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
-            "rocketmq.consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
+            "rocketmq.push-consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC=false",
+            "rocketmq.push-consumer.listeners.spring_rocketmq.FOO_TEST_TOPIC2=true").
             run((context) -> {
                 RocketMQProperties rocketMQProperties = context.getBean(RocketMQProperties.class);
-                assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
-                assertThat(rocketMQProperties.getConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
+                assertThat(rocketMQProperties.getPushConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC").booleanValue()).isEqualTo(false);
+                assertThat(rocketMQProperties.getPushConsumer().getListeners().get("spring_rocketmq").get("FOO_TEST_TOPIC2").booleanValue()).isEqualTo(true);
             });
 
     }
diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
index 4948c81..4d9e3cc 100644
--- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
+++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/core/RocketMQTemplateTest.java
@@ -53,7 +53,7 @@ import static org.mockito.ArgumentMatchers.any;
     "rocketmq.producer.secret-key=test-sk", "rocketmq.accessChannel=LOCAL",
     "rocketmq.producer.sendMessageTimeout= 3500", "rocketmq.producer.retryTimesWhenSendFailed=3",
     "rocketmq.producer.retryTimesWhenSendAsyncFailed=3",
-    "rocketmq.consumer.group=spring_rocketmq", "rocketmq.consumer.topic=test"}, classes = {RocketMQAutoConfiguration.class, TransactionListenerImpl.class})
+    "rocketmq.pull-consumer.group=spring_rocketmq", "rocketmq.pull-consumer.topic=test"}, classes = {RocketMQAutoConfiguration.class, TransactionListenerImpl.class})
 
 public class RocketMQTemplateTest {
     @Resource