You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/02/10 06:49:41 UTC

[rocketmq-spring] branch master updated: [ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git


The following commit(s) were added to refs/heads/master by this push:
     new 2853384  [ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210)
2853384 is described below

commit 2853384030397fe9453316c05b68b6d570d67019
Author: 爱因斯唐 <fo...@users.noreply.github.com>
AuthorDate: Mon Feb 10 14:49:33 2020 +0800

    [ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210)
    
    * change clientId algorithm
    
    * code format
    
    * develop
    
    * optimize on 2.0.5.EINSITANG
    
    * revert pom version
    
    * change note
    
    * change note
    
    * revert demo.rocketmq.myNameServer
    
    * remove clientInstaceName
    
    * remove unuse method
    
    * pass ci-check
    
    * remove pass annotation
    
    * correct variable word
    
    * optimize annotation
    
    * merge
    
    Co-authored-by: von gosling <vo...@apache.org>
---
 .../ExtProducerResetConfiguration.java             | 13 +++---
 .../ListenerContainerConfiguration.java            | 21 +++++----
 .../RocketMQTransactionConfiguration.java          | 10 +++--
 .../rocketmq/spring/support/SpringBeanUtil.java    | 52 ++++++++++++++++++++++
 4 files changed, 79 insertions(+), 17 deletions(-)

diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index e5e7433..9ea7699 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.apache.rocketmq.spring.support.SpringBeanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
@@ -52,7 +53,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
     private RocketMQMessageConverter rocketMQMessageConverter;
 
     public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
-        StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+                                         StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
         this.rocketMQMessageConverter = rocketMQMessageConverter;
         this.environment = environment;
         this.rocketMQProperties = rocketMQProperties;
@@ -60,12 +61,12 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
 
     @Override
     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        this.applicationContext = (ConfigurableApplicationContext)applicationContext;
+        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
     }
 
     @Override
     public void afterSingletonsInstantiated() {
-        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
+        Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, ExtRocketMQTemplateConfiguration.class);
 
         if (Objects.nonNull(beans)) {
             beans.forEach(this::registerTemplate);
@@ -80,7 +81,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
         }
 
         ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
-        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
+        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
         validate(annotation, genericApplicationContext);
 
         DefaultMQProducer mqProducer = createProducer(annotation);
@@ -92,7 +93,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
             throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
                 beanName), e);
         }
-        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
+        RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
         rocketMQTemplate.setProducer(mqProducer);
         rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
         log.info("Set real producer to :{} {}", beanName, annotation.value());
@@ -130,7 +131,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
     }
 
     private void validate(ExtRocketMQTemplateConfiguration annotation,
-        GenericApplicationContext genericApplicationContext) {
+                          GenericApplicationContext genericApplicationContext) {
         if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
             throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
                     "please check the @ExtRocketMQTemplateConfiguration",
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 699474d..008a4db 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -17,10 +17,6 @@
 
 package org.apache.rocketmq.spring.autoconfigure;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.client.AccessChannel;
 import org.apache.rocketmq.spring.annotation.ConsumeMode;
 import org.apache.rocketmq.spring.annotation.MessageModel;
@@ -29,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
 import org.apache.rocketmq.spring.core.RocketMQReplyListener;
 import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
 import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.SpringBeanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
@@ -43,6 +40,11 @@ import org.springframework.context.support.GenericApplicationContext;
 import org.springframework.core.env.StandardEnvironment;
 import org.springframework.util.StringUtils;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
 @Configuration
 public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
     private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
@@ -58,7 +60,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
     private RocketMQMessageConverter rocketMQMessageConverter;
 
     public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
-        StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+                                          StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
         this.rocketMQMessageConverter = rocketMQMessageConverter;
         this.environment = environment;
         this.rocketMQProperties = rocketMQProperties;
@@ -71,7 +73,8 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
 
     @Override
     public void afterSingletonsInstantiated() {
-        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
+        Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQMessageListener.
+            class);
 
         if (Objects.nonNull(beans)) {
             beans.forEach(this::registerContainer);
@@ -127,7 +130,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
     }
 
     private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
-        RocketMQMessageListener annotation) {
+                                                                             RocketMQMessageListener annotation) {
         DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
 
         container.setRocketMQMessageListener(annotation);
@@ -145,13 +148,15 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
             container.setSelectorExpression(tags);
         }
         container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
+
         if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
             container.setRocketMQListener((RocketMQListener) bean);
         } else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
             container.setRocketMQReplyListener((RocketMQReplyListener) bean);
         }
+      
         container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
-        container.setName(name);  // REVIEW ME, use the same clientId or multiple?
+        container.setName(name);
 
         return container;
     }
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java
index 1a897e5..2daefcf 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java
@@ -22,11 +22,13 @@ import java.util.Objects;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+
 import org.apache.rocketmq.client.producer.TransactionMQProducer;
 import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
 import org.apache.rocketmq.spring.core.RocketMQTemplate;
 import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.apache.rocketmq.spring.support.SpringBeanUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.aop.framework.AopProxyUtils;
@@ -44,12 +46,14 @@ public class RocketMQTransactionConfiguration implements ApplicationContextAware
 
     private ConfigurableApplicationContext applicationContext;
 
-    @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
         this.applicationContext = (ConfigurableApplicationContext) applicationContext;
     }
 
-    @Override public void afterSingletonsInstantiated() {
-        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class);
+    @Override
+    public void afterSingletonsInstantiated() {
+        Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQTransactionListener.class);
 
         if (Objects.nonNull(beans)) {
             beans.forEach(this::registerTransactionListener);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java
new file mode 100644
index 0000000..b5d1161
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+import org.springframework.aop.scope.ScopedProxyUtils;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.lang.NonNull;
+
+import java.lang.annotation.Annotation;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class SpringBeanUtil {
+
+    /**
+     * Override applicationContext.getBeansWithAnnotation method to make sure without same ProxyTarget beans
+     *
+     * @param applicationContext spring Application Context
+     * @param clazz              annotation class
+     * @return beans map without proxyTarget bean
+     */
+    public static Map<String, Object> getBeansWithAnnotation(@NonNull ConfigurableApplicationContext applicationContext, Class<? extends Annotation> clazz) {
+        Map<String, Object> beans = applicationContext.getBeansWithAnnotation(clazz);
+        Map<String, Object> filterBeans = new HashMap<>(beans.size());
+        // remove proxy target
+        Set<Map.Entry<String, Object>> entrySet = beans.entrySet();
+        entrySet.forEach((entry) -> {
+            final String beanName = entry.getKey();
+            if (!ScopedProxyUtils.isScopedTarget(beanName)) {
+                filterBeans.put(beanName, entry.getValue());
+            }
+        });
+        return filterBeans;
+    }
+
+}