You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:20 UTC

[rocketmq-connect] 10/39: Automatically create target topic. resolve #396

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit b4f8cd93fc6ee52a9034fa5b6f325f795d0b243f
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Tue Sep 10 23:07:49 2019 +0800

    Automatically create target topic. resolve #396
---
 .../rocketmq/replicator/RmqSourceReplicator.java   | 97 ++++++++++++++++++----
 .../apache/rocketmq/replicator/common/Utils.java   | 59 ++++++++++++-
 .../rocketmq/replicator/config/ConfigDefine.java   |  6 +-
 3 files changed, 142 insertions(+), 20 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index b49e06d..6ea3ae8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -29,11 +29,14 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.replicator.common.ConstDefine;
 import org.apache.rocketmq.replicator.common.Utils;
 import org.apache.rocketmq.replicator.config.ConfigDefine;
@@ -67,7 +70,8 @@ public class RmqSourceReplicator extends SourceConnector {
 
     private int taskParallelism = 1;
 
-    private DefaultMQAdminExt defaultMQAdminExt;
+    private DefaultMQAdminExt srcMQAdminExt;
+    private DefaultMQAdminExt targetMQAdminExt;
 
     private volatile boolean adminStarted;
 
@@ -81,16 +85,26 @@ public class RmqSourceReplicator extends SourceConnector {
             return;
         }
         RPCHook rpcHook = null;
-        this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
-        this.defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
-        this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
-        this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+
+        this.targetMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.targetMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ));
+        this.targetMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+        this.targetMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_RMQ)));
+
         try {
-            defaultMQAdminExt.start();
-            log.info("RocketMQ defaultMQAdminExt started");
+            this.srcMQAdminExt.start();
+            log.info("RocketMQ srcMQAdminExt started");
+
+            this.targetMQAdminExt.start();
+            log.info("RocketMQ targetMQAdminExt started");
         } catch (MQClientException e) {
-            log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+            log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
         }
+
         adminStarted = true;
     }
 
@@ -172,14 +186,16 @@ public class RmqSourceReplicator extends SourceConnector {
     }
 
     public void buildRoute() {
+        List<Pattern> patterns = new ArrayList<Pattern>();
+        String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
         try {
-            List<Pattern> patterns = new ArrayList<Pattern>();
+            Set<String> targetTopicSet = fetchTargetTopics();
             for (String topic : this.whiteList) {
                 Pattern pattern = Pattern.compile(topic);
                 patterns.add(pattern);
             }
 
-            TopicList topics = defaultMQAdminExt.fetchAllTopicList();
+            TopicList topics = srcMQAdminExt.fetchAllTopicList();
             for (String topic : topics.getTopicList()) {
                 if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
                     (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
@@ -188,14 +204,29 @@ public class RmqSourceReplicator extends SourceConnector {
                     for (Pattern pattern : patterns) {
                         Matcher matcher = pattern.matcher(topic);
                         if (matcher.matches()) {
-                            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                            if (!targetTopicSet.contains(topic)) {
+                                ensureTargetTopic(topic, topic);
+                            }
+
+                            // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
+                            // QueueData use brokerName as unique info on cluster of rocketmq. so when we want to get QueueData of
+                            // expected cluster, we should get brokerNames of expected cluster, and then filter queueDatas.
+                            List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, topic, srcCluster);
+                            Set<String> brokerNameSet = new HashSet<String>();
+                            for (BrokerData b : brokerList) {
+                                brokerNameSet.add(b.getBrokerName());
+                            }
+
+                            TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
                             if (!topicRouteMap.containsKey(topic)) {
                                 topicRouteMap.put(topic, new ArrayList<MessageQueue>());
                             }
                             for (QueueData qd : topicRouteData.getQueueDatas()) {
-                                for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
-                                    topicRouteMap.get(topic).add(mq);
+                                if (brokerNameSet.contains(qd.getBrokerName())) {
+                                    for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+                                        topicRouteMap.get(topic).add(mq);
+                                    }
                                 }
                             }
                         }
@@ -205,7 +236,7 @@ public class RmqSourceReplicator extends SourceConnector {
         } catch (Exception e) {
             log.error("Fetch topic list error.", e);
         } finally {
-            defaultMQAdminExt.shutdown();
+            srcMQAdminExt.shutdown();
         }
     }
 
@@ -216,5 +247,41 @@ public class RmqSourceReplicator extends SourceConnector {
     public Map<String, List<MessageQueue>> getTopicRouteMap() {
         return this.topicRouteMap;
     }
+
+    public Set<String> fetchTargetTopics() throws RemotingException, MQClientException, InterruptedException {
+        String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+        TopicList targetTopics = this.targetMQAdminExt.fetchTopicsByCLuster(targetCluster);
+        return targetTopics.getTopicList();
+    }
+
+    /**
+     * ensure target topic eixst. if target topic does not exist, ensureTopic will create target topic on target
+     * cluster, with same TopicConfig but using target topic name. any exception will be caught and then throw
+     * IllegalStateException.
+     *
+     * @param srcTopic
+     * @param targetTopic
+     * @throws RemotingException
+     * @throws MQClientException
+     * @throws InterruptedException
+     */
+    public void ensureTargetTopic(String srcTopic,
+        String targetTopic) throws RemotingException, MQClientException, InterruptedException {
+        String srcCluster = this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_CLUSTER);
+        String targetCluster = this.replicatorConfig.getString(ConfigDefine.CONN_TARGET_CLUSTER);
+
+        List<BrokerData> brokerList = Utils.examineBrokerData(this.srcMQAdminExt, srcTopic, srcCluster);
+        if (brokerList.size() == 0) {
+            throw new IllegalStateException(String.format("no broker found for srcTopic: %s srcCluster: %s", srcTopic, srcCluster));
+        }
+
+        String brokerAddr = brokerList.get(0).selectBrokerAddr();
+        TopicConfig topicConfig = this.srcMQAdminExt.examineTopicConfig(brokerAddr, srcTopic);
+        topicConfig.setTopicName(targetTopic);
+        Utils.createTopic(this.targetMQAdminExt, topicConfig, targetCluster);
+
+        throw new IllegalStateException("");
+    }
+
 }
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 6888038..e5c0866 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -19,8 +19,20 @@ package org.apache.rocketmq.replicator.common;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Utils {
+    private static final Logger log = LoggerFactory.getLogger(Utils.class);
 
     public static String createGroupName(String prefix) {
         return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
@@ -37,7 +49,7 @@ public class Utils {
     public static String createInstanceName(String namesrvAddr) {
         String[] namesrvArray = namesrvAddr.split(";");
         List<String> namesrvList = new ArrayList<String>();
-        for (String ns: namesrvArray) {
+        for (String ns : namesrvArray) {
             if (!namesrvList.contains(ns)) {
                 namesrvList.add(ns);
             }
@@ -45,4 +57,49 @@ public class Utils {
         Collections.sort(namesrvList);
         return String.valueOf(namesrvList.toString().hashCode());
     }
+
+
+    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic, String cluster) throws RemotingException, MQClientException, InterruptedException {
+        List<BrokerData> brokerList = new ArrayList<BrokerData>();
+
+        TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+        if (topicRouteData.getBrokerDatas() != null) { // check下
+            for (BrokerData broker : topicRouteData.getBrokerDatas()) {
+                if (StringUtils.equals(broker.getCluster(), cluster)) {
+                    brokerList.add(broker);
+                }
+            }
+        }
+        return brokerList;
+    }
+
+    public static void createTopic(DefaultMQAdminExt defaultMQAdminExt, TopicConfig topicConfig, String clusterName) {
+        try {
+            Set<String> masterSet =
+                CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+            for (String addr : masterSet) {
+                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+                log.info("create topic to %s success.%n", addr);
+            }
+
+            if (topicConfig.isOrder()) {
+                Set<String> brokerNameSet =
+                    CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
+                StringBuilder orderConf = new StringBuilder();
+                String splitor = "";
+                for (String s : brokerNameSet) {
+                    orderConf.append(splitor).append(s).append(":")
+                        .append(topicConfig.getWriteQueueNums());
+                    splitor = ";";
+                }
+                defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
+                    orderConf.toString(), true);
+                log.info("set cluster orderConf=[%s]", orderConf);
+            }
+
+            return;
+        } catch (Exception e) {
+            throw new IllegalArgumentException("create topic: " + topicConfig + "failed", e);
+        }
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index 3934c2f..ddf4972 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -22,10 +22,12 @@ import java.util.Set;
 public class ConfigDefine {
 
     public static final String CONN_SOURCE_RMQ = "source-rocketmq";
+    public static final String CONN_SOURCE_CLUSTER = "source-cluster";
 
     public static final String CONN_STORE_TOPIC = "replicator-store-topic";
 
     public static final String CONN_TARGET_RMQ = "target-rocketmq";
+    public static final String CONN_TARGET_CLUSTER = "target-cluster";
 
     public static final String CONN_SOURCE_GROUP = "source-group";
 
@@ -33,10 +35,6 @@ public class ConfigDefine {
 
     public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
 
-    public static final String CONN_BROKER_NAME = "broker-name";
-
-    public static final String CONN_SOURCE_TOPIC = "source-topic";
-
     public static final String CONN_WHITE_LIST = "white-list";
 
     public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";