You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/02/06 09:39:53 UTC
[GitHub] [rocketmq] 2259289435 edited a comment on issue #566: Auto create
topic question
2259289435 edited a comment on issue #566: Auto create topic question
URL: https://github.com/apache/rocketmq/issues/566#issuecomment-582813212
clinet,broker version 4.6.0,if client,broker version not equals,get broker createtopickey first
producer:
SendMessageHook.sendMessageBefore(final SendMessageContext context)
1. getTopic
String newTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic());
2.createTopicForCache
createTopicCache.get(newTopic, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
Set<String> topicList = context.getProducer().getmQClientFactory().getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
if(!topicList.contains(newTopic)) {
log.info("producerGroup : {}, newTopic : {} begin autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), newTopic);
context.getProducer().getmQClientFactory().getMQAdminImpl().createTopic(MixAllPlus.AUTO_CREATE_TOPIC_KEY_TOPIC, newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
log.info("producerGroup : {}, newTopic : {} end autoCreate", context.getProducer().getDefaultMQProducer().getProducerGroup(), newTopic);
}
return true;
}
});
pushconsumer.
1. beforeStart(pushconsumer consumer) //custom method
2. create instance
DefaultMQPushConsumerImpl impl = consumer.getDefaultMQPushConsumerImpl();
if (consumer.getMessageModel() == MessageModel.CLUSTERING) {
consumer.changeInstanceNameToPID();
}
RPCHook rpcHook = ReflectUtil.on(impl).field("rpcHook").get();
MQClientInstance instance = MQClientManager.getInstance().getOrCreateMQClientInstance(consumer, rpcHook);
instance.start();
3.getNewTopic
Set<String> newTopics = Sets.newHashSet();
Set<String> topicList = instance.getMQClientAPIImpl().getTopicListFromNameServer(20000L).getTopicList();
for(String topic : impl.getSubscriptionInner().keySet()) {
String newTopic = NamespaceUtil.withoutNamespace(topic);
if(!topicList.contains(newTopic)) {
newTopics.add(newTopic);
}
}
4.createTopic
for(final String newTopic : newTopics) {
createTopicCache.get(newTopic, new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
log.info("consumerGroup : {}, newTopic : {} begin autoCreate", consumer.getConsumerGroup(), newTopic);
instance.getMQAdminImpl().createTopic(instance.getDefaultMQProducer().getCreateTopicKey(), newTopic, DEFAULT_AUTO_CREATE_TOPIC_QUEUE_SIZE);
log.info("consumerGroup : {}, newTopic : {} end autoCreate", consumer.getConsumerGroup(), newTopic);
return true;
}
});
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services