You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/02/10 06:49:41 UTC
[rocketmq-spring] branch master updated: [ISSUE #218] Fix spring
scopeTarget will repeat consumer instance (#210)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-spring.git
The following commit(s) were added to refs/heads/master by this push:
new 2853384 [ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210)
2853384 is described below
commit 2853384030397fe9453316c05b68b6d570d67019
Author: 爱因斯唐 <fo...@users.noreply.github.com>
AuthorDate: Mon Feb 10 14:49:33 2020 +0800
[ISSUE #218] Fix spring scopeTarget will repeat consumer instance (#210)
* change clientId algorithm
* code format
* develop
* optimize on 2.0.5.EINSITANG
* revert pom version
* change note
* change note
* revert demo.rocketmq.myNameServer
* remove clientInstaceName
* remove unuse method
* pass ci-check
* remove pass annotation
* correct variable word
* optimize annotation
* merge
Co-authored-by: von gosling <vo...@apache.org>
---
.../ExtProducerResetConfiguration.java | 13 +++---
.../ListenerContainerConfiguration.java | 21 +++++----
.../RocketMQTransactionConfiguration.java | 10 +++--
.../rocketmq/spring/support/SpringBeanUtil.java | 52 ++++++++++++++++++++++
4 files changed, 79 insertions(+), 17 deletions(-)
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
index e5e7433..9ea7699 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ExtProducerResetConfiguration.java
@@ -25,6 +25,7 @@ import org.apache.rocketmq.spring.annotation.ExtRocketMQTemplateConfiguration;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -52,7 +53,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
private RocketMQMessageConverter rocketMQMessageConverter;
public ExtProducerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
- StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+ StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
@@ -60,12 +61,12 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = (ConfigurableApplicationContext)applicationContext;
+ this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
@Override
public void afterSingletonsInstantiated() {
- Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(ExtRocketMQTemplateConfiguration.class);
+ Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, ExtRocketMQTemplateConfiguration.class);
if (Objects.nonNull(beans)) {
beans.forEach(this::registerTemplate);
@@ -80,7 +81,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
}
ExtRocketMQTemplateConfiguration annotation = clazz.getAnnotation(ExtRocketMQTemplateConfiguration.class);
- GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
+ GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;
validate(annotation, genericApplicationContext);
DefaultMQProducer mqProducer = createProducer(annotation);
@@ -92,7 +93,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
throw new BeanDefinitionValidationException(String.format("Failed to startup MQProducer for RocketMQTemplate {}",
beanName), e);
}
- RocketMQTemplate rocketMQTemplate = (RocketMQTemplate)bean;
+ RocketMQTemplate rocketMQTemplate = (RocketMQTemplate) bean;
rocketMQTemplate.setProducer(mqProducer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real producer to :{} {}", beanName, annotation.value());
@@ -130,7 +131,7 @@ public class ExtProducerResetConfiguration implements ApplicationContextAware, S
}
private void validate(ExtRocketMQTemplateConfiguration annotation,
- GenericApplicationContext genericApplicationContext) {
+ GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQTemplateConfiguration",
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
index 699474d..008a4db 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/ListenerContainerConfiguration.java
@@ -17,10 +17,6 @@
package org.apache.rocketmq.spring.autoconfigure;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
@@ -29,6 +25,7 @@ import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer;
import org.apache.rocketmq.spring.support.RocketMQMessageConverter;
+import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -43,6 +40,11 @@ import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.util.StringUtils;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
@Configuration
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private final static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);
@@ -58,7 +60,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
private RocketMQMessageConverter rocketMQMessageConverter;
public ListenerContainerConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
- StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
+ StandardEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
@@ -71,7 +73,8 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
@Override
public void afterSingletonsInstantiated() {
- Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
+ Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQMessageListener.
+ class);
if (Objects.nonNull(beans)) {
beans.forEach(this::registerContainer);
@@ -127,7 +130,7 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
}
private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
- RocketMQMessageListener annotation) {
+ RocketMQMessageListener annotation) {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
container.setRocketMQMessageListener(annotation);
@@ -145,13 +148,15 @@ public class ListenerContainerConfiguration implements ApplicationContextAware,
container.setSelectorExpression(tags);
}
container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
+
if (RocketMQListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQListener((RocketMQListener) bean);
} else if (RocketMQReplyListener.class.isAssignableFrom(bean.getClass())) {
container.setRocketMQReplyListener((RocketMQReplyListener) bean);
}
+
container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
- container.setName(name); // REVIEW ME, use the same clientId or multiple?
+ container.setName(name);
return container;
}
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java
index 1a897e5..2daefcf 100644
--- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/autoconfigure/RocketMQTransactionConfiguration.java
@@ -22,11 +22,13 @@ import java.util.Objects;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQUtil;
+import org.apache.rocketmq.spring.support.SpringBeanUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
@@ -44,12 +46,14 @@ public class RocketMQTransactionConfiguration implements ApplicationContextAware
private ConfigurableApplicationContext applicationContext;
- @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}
- @Override public void afterSingletonsInstantiated() {
- Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQTransactionListener.class);
+ @Override
+ public void afterSingletonsInstantiated() {
+ Map<String, Object> beans = SpringBeanUtil.getBeansWithAnnotation(this.applicationContext, RocketMQTransactionListener.class);
if (Objects.nonNull(beans)) {
beans.forEach(this::registerTransactionListener);
diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java
new file mode 100644
index 0000000..b5d1161
--- /dev/null
+++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/SpringBeanUtil.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.spring.support;
+
+import org.springframework.aop.scope.ScopedProxyUtils;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.lang.NonNull;
+
+import java.lang.annotation.Annotation;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class SpringBeanUtil {
+
+ /**
+ * Override applicationContext.getBeansWithAnnotation method to make sure without same ProxyTarget beans
+ *
+ * @param applicationContext spring Application Context
+ * @param clazz annotation class
+ * @return beans map without proxyTarget bean
+ */
+ public static Map<String, Object> getBeansWithAnnotation(@NonNull ConfigurableApplicationContext applicationContext, Class<? extends Annotation> clazz) {
+ Map<String, Object> beans = applicationContext.getBeansWithAnnotation(clazz);
+ Map<String, Object> filterBeans = new HashMap<>(beans.size());
+ // remove proxy target
+ Set<Map.Entry<String, Object>> entrySet = beans.entrySet();
+ entrySet.forEach((entry) -> {
+ final String beanName = entry.getKey();
+ if (!ScopedProxyUtils.isScopedTarget(beanName)) {
+ filterBeans.put(beanName, entry.getValue());
+ }
+ });
+ return filterBeans;
+ }
+
+}