You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/03/13 08:34:11 UTC

[GitHub] [rocketmq-spring] liuxuzxx commented on issue #173: How to dynamically configure consumeThreadMax in @RocketMQMessageListener

liuxuzxx commented on issue #173: How to dynamically configure consumeThreadMax in @RocketMQMessageListener
URL: https://github.com/apache/rocketmq-spring/issues/173#issuecomment-598608935
 
 
   其实也能动态修改,就是路子有点野,我们就是这么干的,使用如下:
   ```
   @Component
   public class RocketMQBeanPostProcessor implements BeanPostProcessor, ApplicationContextAware {
       private static final Logger logger = LoggerFactory.getLogger(RocketMQBeanPostProcessor.class);
       private ApplicationContext applicationContext;
   
       @Override
       public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
           if (bean instanceof DefaultRocketMQListenerContainer) {
               //只抓DefaultRocketMQListenerContainer对象,别的bean一律不问
               DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;
               //利用反射修改container里面的consumeThreadMax字段的值,还好作者使用这个值的时候是在InitializingBean.afterPropertiesSet
               //如果时机在bean的出生阶段,那就不好整了.
               reflectModify(container);
           }
           return bean;
       }
   
       private DefaultRocketMQListenerContainer reflectModify(DefaultRocketMQListenerContainer container){
           //实现ApplicationContextAware接口,开始接管spring的bean工厂
           //要不然使用@Autowire的方式,注入不了。获取你的配置对象(甚至可以从数据库读取,但是数据库的我没有实验)
           MaxThreadConfig config = this.applicationContext.getBean(MaxThreadConfig.class);
           //使用反射过滤出来你要修改的consumerThreadMax字段
           Field consumeThreadMaxField = Arrays.stream(container.getClass().getDeclaredFields())
                   .filter(field -> field.getName().equals("consumeThreadMax"))
                   .findFirst()
                   .orElse(null);
           Optional.ofNullable(consumeThreadMaxField)
                   .ifPresent(field -> {
                       //别忘设置访问权限为true
                       field.setAccessible(true);
                       try {
                           //设置配置的数值
                           field.setInt(container,config.getMaxThreadCount());
                       } catch (IllegalAccessException e) {
                           logger.warn("设置失败了!");
                       }
                   });
           return container;
       }
   
       @Override
       public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
           this.applicationContext = applicationContext;
       }
   }
   ```
   
   下面是我的配置类信息:
   ```
   @Component
   public class MaxThreadConfig {
   
       @Value("${bird.max-thread-count}")
       private int maxThreadCount;
   
       public int getMaxThreadCount() {
           return maxThreadCount;
       }
   
       public void setMaxThreadCount(int maxThreadCount) {
           this.maxThreadCount = maxThreadCount;
       }
   }
   ```
   application.yml的配置信息:
   ```
   bird:
     max-thread-count: 17
   ```
   如果,用户有很多个RocketMQListener<T>的实现,也就是有多个消费者,也有方案:
   ```
   首先获取RocketMQMessageListener这个注解,然后根据注解里面的topic或者consumerGroup等信息来switch一下,按照自己的操作设置就行了
           RocketMQMessageListener listener = container.getRocketMQMessageListener();
           switch (listener.topic()){
               case "short-message-log":
                   Optional.ofNullable(consumeThreadMaxField)
                           .ifPresent(field -> {
                               field.setAccessible(true);
                               try {
                                   field.setInt(container,config.getMaxThreadCount());
                               } catch (IllegalAccessException e) {
                                   logger.warn("设置失败了!");
                               }
                           });
                   break;
               case "short-message":
                   Optional.ofNullable(consumeThreadMaxField)
                           .ifPresent(field -> {
                               field.setAccessible(true);
                               try {
                                   //这个地方我是偷懒随便加了个3做测试,使用的时候正常config.getXXXX就行
                                   field.setInt(container,config.getMaxThreadCount()+3);
                               } catch (IllegalAccessException e) {
                                   logger.warn("设置失败了!");
                               }
                           });
                   break;
           }
   ```
   
   最后总结一句话:spring的bean的生命周期很重要,尤其是spring5又增加了一些生命阶段。还有,只要有反射+字节码技术,基本上没有修改不了的java问题,就是路子有点野.
   
   不过最后还是希望作者能修改下这个consumerThreadMax可以使用SpEL的形式

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services