You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/02/06 09:39:53 UTC

[GitHub] [rocketmq] 2259289435 edited a comment on issue #566: Auto create topic question

2259289435 edited a comment on issue #566: Auto create topic question
URL: https://github.com/apache/rocketmq/issues/566#issuecomment-582813212
 
 
   clinet,broker version 4.6.0,if client,broker version not equals,get broker createtopickey first
   producer:
   SendMessageHook.sendMessageBefore(final SendMessageContext context)
   1. getTopic
   String newTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic());
   2.createTopicForCache
   createTopicCache.get(newTopic, new Callable<Boolean>() {
               @Override
               public Boolean call() throws Exception {
                   Set<String> topicList = context.getProducer().getmQClientFactory().getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
                   if(!topicList.contains(newTopic)) {
                       log.info("producerGroup : {}, newTopic : {} begin autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), newTopic);
                       context.getProducer().getmQClientFactory().getMQAdminImpl().createTopic(MixAllPlus.AUTO_CREATE_TOPIC_KEY_TOPIC, newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
                       log.info("producerGroup : {}, newTopic : {} end autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), newTopic);
                   }
                   return true;
               }
           });
   
   pushconsumer.
   1. beforeStart(pushconsumer consumer)                    //custom method
   2. create instance
   DefaultMQPushConsumerImpl impl = consumer.getDefaultMQPushConsumerImpl();
           if (consumer.getMessageModel() == MessageModel.CLUSTERING) {
               consumer.changeInstanceNameToPID();
           }
           RPCHook rpcHook = ReflectUtil.on(impl).field("rpcHook").get();
           MQClientInstance instance = MQClientManager.getInstance().getOrCreateMQClientInstance(consumer, rpcHook);
           instance.start();
   3.getNewTopic
   Set<String> newTopics = Sets.newHashSet();
           Set<String> topicList = instance.getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
           for(String topic : impl.getSubscriptionInner().keySet()) {
               String newTopic = NamespaceUtil.withoutNamespace(topic);
               if(!topicList.contains(newTopic)) {
                   newTopics.add(newTopic);
               }
           }
   4.createTopic 
   for(final String newTopic : newTopics) {
               createTopicCache.get(newTopic, new Callable<Boolean>() {
                   @Override
                   public Boolean call() throws Exception {
                       log.info("consumerGroup : {}, newTopic : {} begin autoCreate", consumer.getConsumerGroup(), newTopic);
                       instance.getMQAdminImpl().createTopic(instance.getDefaultMQProducer().getCreateTopicKey(), newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
                       log.info("consumerGroup : {}, newTopic : {} end autoCreate", consumer.getConsumerGroup(), newTopic);
                       return true;
                   }
               });
           }
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services