You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:20 UTC
[rocketmq-connect] 10/39: Automatically create target topic. resolve #396
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit b4f8cd93fc6ee52a9034fa5b6f325f795d0b243f
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Tue Sep 10 23:07:49 2019 +0800
Automatically create target topic. resolve #396
---
.../rocketmq/replicator/RmqSourceReplicator.java | 97 ++++++++++++++++++----
.../apache/rocketmq/replicator/common/Utils.java | 59 ++++++++++++-
.../rocketmq/replicator/config/ConfigDefine.java | 6 +-
3 files changed, 142 insertions(+), 20 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index b49e06d..6ea3ae8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -29,11 +29,14 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.replicator.common.ConstDefine;
import org.apache.rocketmq.replicator.common.Utils;
import org.apache.rocketmq.replicator.config.ConfigDefine;
@@ -67,7 +70,8 @@ public class RmqSourceReplicator extends SourceConnector {
private int taskParallelism = 1;
- private DefaultMQAdminExt defaultMQAdminExt;
+ private DefaultMQAdminExt srcMQAdminExt;
+ private DefaultMQAdminExt targetMQAdminExt;
private volatile boolean adminStarted;
@@ -81,16 +85,26 @@ public class RmqSourceReplicator extends SourceConnector {
return;
}
RPCHook rpcHook = null;
- this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
- this.defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
- this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
- this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+ this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+ this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+
+ this.targetMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.targetMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ));
+ this.targetMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ this.targetMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ)));
+
try {
- defaultMQAdminExt.start();
- log.info("RocketMQ defaultMQAdminExt started");
+ this.srcMQAdminExt.start();
+ log.info("RocketMQ srcMQAdminExt started");
+
+ this.targetMQAdminExt.start();
+ log.info("RocketMQ targetMQAdminExt started");
} catch (MQClientException e) {
- log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+ log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
}
+
adminStarted = true;
}
@@ -172,14 +186,16 @@ public class RmqSourceReplicator extends SourceConnector {
}
public void buildRoute() {
+ List<Pattern> patterns = new ArrayList<Pattern>();
+ String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
try {
- List<Pattern> patterns = new ArrayList<Pattern>();
+ Set<String> targetTopicSet = fetchTargetTopics();
for (String topic : this.whiteList) {
Pattern pattern = Pattern.compile(topic);
patterns.add(pattern);
}
- TopicList topics = defaultMQAdminExt.fetchAllTopicList();
+ TopicList topics = srcMQAdminExt.fetchAllTopicList();
for (String topic : topics.getTopicList()) {
if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
(syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
@@ -188,14 +204,29 @@ public class RmqSourceReplicator extends SourceConnector {
for (Pattern pattern : patterns) {
Matcher matcher = pattern.matcher(topic);
if (matcher.matches()) {
- TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (!targetTopicSet.contains(topic)) {
+ ensureTargetTopic(topic, topic);
+ }
+
+ // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+ // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+ // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+ List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+ Set<String> brokerNameSet = new HashSet<String>();
+ for (BrokerData b : brokerList) {
+ brokerNameSet.add(b.getBrokerName());
+ }
+
+ TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
if (!topicRouteMap.containsKey(topic)) {
topicRouteMap.put(topic, new ArrayList<MessageQueue>());
}
for (QueueData qd : topicRouteData.getQueueDatas()) {
- for (int i = 0; i < qd.getReadQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- topicRouteMap.get(topic).add(mq);
+ if (brokerNameSet.contains(qd.getBrokerName())) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ topicRouteMap.get(topic).add(mq);
+ }
}
}
}
@@ -205,7 +236,7 @@ public class RmqSourceReplicator extends SourceConnector {
} catch (Exception e) {
log.error("Fetch topic list error.", e);
} finally {
- defaultMQAdminExt.shutdown();
+ srcMQAdminExt.shutdown();
}
}
@@ -216,5 +247,41 @@ public class RmqSourceReplicator extends SourceConnector {
public Map<String, List<MessageQueue>> getTopicRouteMap() {
return this.topicRouteMap;
}
+
+ public Set<String> fetchTargetTopics() throws RemotingException, MQClientException, InterruptedException {
+ String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+ TopicList targetTopics = this.targetMQAdminExt.fetchTopicsByCLuster(targetCluster);
+ return targetTopics.getTopicList();
+ }
+
+ /**
+ * ensure target topic eixst. if target topic does not exist, ensureTopic will create target topic on target
+ * cluster, with same TopicConfig but using target topic name. any exception will be caught and then throw
+ * IllegalStateException.
+ *
+ * @param srcTopic
+ * @param targetTopic
+ * @throws RemotingException
+ * @throws MQClientException
+ * @throws InterruptedException
+ */
+ public void ensureTargetTopic(String srcTopic,
+ String targetTopic) throws RemotingException, MQClientException, InterruptedException {
+ String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
+ String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+
+ List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, srcTopic, srcCluster);
+ if (brokerList.size() == 0) {
+ throw new IllegalStateException(String.format("no broker found for srcTopic: %s srcCluster: %s", srcTopic, srcCluster));
+ }
+
+ String brokerAddr = brokerList.get(0).selectBrokerAddr();
+ TopicConfig topicConfig = this.srcMQAdminExt.examineTopicConfig(brokerAddr, srcTopic);
+ topicConfig.setTopicName(targetTopic);
+ Utils.createTopic(this.targetMQAdminExt, topicConfig, targetCluster);
+
+ throw new IllegalStateException("");
+ }
+
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 6888038..e5c0866 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -19,8 +19,20 @@ package org.apache.rocketmq.replicator.common;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Utils {
+ private static final Logger log = LoggerFactory.getLogger(Utils.class);
public static String createGroupName(String prefix) {
return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
@@ -37,7 +49,7 @@ public class Utils {
public static String createInstanceName(String namesrvAddr) {
String[] namesrvArray = namesrvAddr.split(";");
List<String> namesrvList = new ArrayList<String>();
- for (String ns: namesrvArray) {
+ for (String ns : namesrvArray) {
if (!namesrvList.contains(ns)) {
namesrvList.add(ns);
}
@@ -45,4 +57,49 @@ public class Utils {
Collections.sort(namesrvList);
return String.valueOf(namesrvList.toString().hashCode());
}
+
+
+ public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic, String cluster) throws RemotingException, MQClientException, InterruptedException {
+ List<BrokerData> brokerList = new ArrayList<BrokerData>();
+
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData.getBrokerDatas() != null) { // check下
+ for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+ if (StringUtils.equals(broker.getCluster(), cluster)) {
+ brokerList.add(broker);
+ }
+ }
+ }
+ return brokerList;
+ }
+
+ public static void createTopic(DefaultMQAdminExt defaultMQAdminExt, TopicConfig topicConfig, String clusterName) {
+ try {
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ log.info("create topic to %s success.%n", addr);
+ }
+
+ if (topicConfig.isOrder()) {
+ Set<String> brokerNameSet =
+ CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
+ StringBuilder orderConf = new StringBuilder();
+ String splitor = "";
+ for (String s : brokerNameSet) {
+ orderConf.append(splitor).append(s).append(":")
+ .append(topicConfig.getWriteQueueNums());
+ splitor = ";";
+ }
+ defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
+ orderConf.toString(), true);
+ log.info("set cluster orderConf=[%s]", orderConf);
+ }
+
+ return;
+ } catch (Exception e) {
+ throw new IllegalArgumentException("create topic: " + topicConfig + "failed", e);
+ }
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index 3934c2f..ddf4972 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -22,10 +22,12 @@ import java.util.Set;
public class ConfigDefine {
public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+ public static final String CONN_SOURCE_CLUSTER = "source-cluster";
public static final String CONN_STORE_TOPIC = "replicator-store-topic";
public static final String CONN_TARGET_RMQ = "target-rocketmq";
+ public static final String CONN_TARGET_CLUSTER = "target-cluster";
public static final String CONN_SOURCE_GROUP = "source-group";
@@ -33,10 +35,6 @@ public class ConfigDefine {
public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
- public static final String CONN_BROKER_NAME = "broker-name";
-
- public static final String CONN_SOURCE_TOPIC = "source-topic";
-
public static final String CONN_WHITE_LIST = "white-list";
public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";