You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:17 UTC

[rocketmq-connect] 07/39: [ISSUE 368]Polish rocketmq replicator implementation (#366)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 40b9fc8d9c4e6b04f06626948c1197be33c60e5b
Author: Heng Du <du...@apache.org>
AuthorDate: Tue Aug 13 14:26:40 2019 +0800

    [ISSUE 368]Polish rocketmq replicator implementation (#366)
    
    * Polish rocketmq replicator implementation
    
    * Add documation of rocketmq-replicator
---
 README.md                                          |  18 ++-
 package.xml                                        |  41 +++++++
 pom.xml                                            |  30 ++++-
 .../rocketmq/replicator/RmqSourceReplicator.java   | 133 +++++++++++----------
 .../apache/rocketmq/replicator/RmqSourceTask.java  |   5 +-
 .../apache/rocketmq/replicator/common/Utils.java   |   6 +-
 6 files changed, 167 insertions(+), 66 deletions(-)

diff --git a/README.md b/README.md
index 77bcb94..b2c98d1 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,22 @@
 # rocketmq-replicator
 
-# boot-parameter
+## rocketmq-replicator打包
+````
+mvn clean install -Prelease-all -DskipTest -U 
+````
+## rocketmq-replicator启动
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
+?config={"connector-class":"org.apache.rocketmq.connector.RmqSourceConnector","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","taskDivideStrategy":"0","white-list”:"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+````
+
+
+## rocketmq-replicator停止
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop
+````
+
+## rocketmq-replicator参数说明
 
 parameter | type | must | description | sample value
 ---|---|---|---|---|
diff --git a/package.xml b/package.xml
new file mode 100644
index 0000000..5dc18b9
--- /dev/null
+++ b/package.xml
@@ -0,0 +1,41 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
+          http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <!-- Assembles a packaged version targeting OS installation. -->
+    <id>package</id>
+    <formats>
+        <format>dir</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <fileSets>
+        <fileSet>
+            <directory>${project.basedir}</directory>
+            <includes>
+                <include>README*</include>
+                <include>LICENSE*</include>
+                <include>NOTICE*</include>
+                <include>licenses/</include>
+                <include>notices/</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/conf</directory>
+            <outputDirectory>conf/</outputDirectory>
+            <includes>
+                <include>*</include>
+            </includes>
+        </fileSet>
+    </fileSets>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>lib/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <useTransitiveFiltering>true</useTransitiveFiltering>
+            <excludes>
+                <exclude>io.openmessaging:openmessaging-connector</exclude>
+                <exclude>com.alibaba:fastjson</exclude>
+            </excludes>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/pom.xml b/pom.xml
index d283ca9..963222e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,8 +74,36 @@
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
-            <version>1.2.51</version>
+            <version>1.2.58</version>
         </dependency>
     </dependencies>
 
+    <profiles>
+        <profile>
+            <id>release-all</id>
+            <build>
+                <plugins>
+                    <plugin>
+                        <artifactId>maven-assembly-plugin</artifactId>
+                        <configuration>
+                            <descriptors>
+                                <descriptor>package.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                        <executions>
+                            <execution>
+                                <id>make-assembly</id>
+                                <phase>package</phase>
+                                <goals>
+                                    <goal>single</goal>
+                                </goals>
+                            </execution>
+                        </executions>
+                    </plugin>
+                </plugins>
+                <finalName>rocketmq-replicator-${project.version}</finalName>
+            </build>
+        </profile>
+    </profiles>
+
 </project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index c1f350f..12473ab 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -19,11 +19,18 @@ package org.apache.rocketmq.replicator;
 import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.replicator.common.ConstDefine;
 import org.apache.rocketmq.replicator.common.Utils;
 import org.apache.rocketmq.replicator.config.ConfigDefine;
@@ -33,13 +40,10 @@ import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
 import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
 import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
 import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
-import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-
 public class RmqSourceReplicator extends SourceConnector {
 
     private static final Logger log = LoggerFactory.getLogger(RmqSourceReplicator.class);
@@ -60,33 +64,56 @@ public class RmqSourceReplicator extends SourceConnector {
 
     private int taskParallelism = 1;
 
-    public RmqSourceReplicator() {
+    private DefaultMQAdminExt defaultMQAdminExt;
+
+    private volatile boolean adminStarted;
 
+    public RmqSourceReplicator() {
         topicRouteMap = new HashMap<String, List<MessageQueue>>();
         whiteList = new HashSet<String>();
     }
 
+    private synchronized void startMQAdminTools() {
+        if (!configValid || adminStarted) {
+            return;
+        }
+        RPCHook rpcHook = null;
+        this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        this.defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+        this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+        this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+        try {
+            defaultMQAdminExt.start();
+            log.info("RocketMQ defaultMQAdminExt started");
+        } catch (MQClientException e) {
+            log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+        }
+        adminStarted = true;
+    }
+
+    @Override
     public String verifyAndSetConfig(KeyValue config) {
 
-        // check the need key.
-        for(String requestKey : ConfigDefine.REQUEST_CONFIG){
-            if(!config.containsKey(requestKey)){
+        // Check the need key.
+        for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+            if (!config.containsKey(requestKey)) {
                 return "Request config key: " + requestKey;
             }
         }
 
-        // check the whitelist, whitelist is required.
+        // Check the whitelist, whitelist is required.
         String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST);
         String[] wl = whileListStr.trim().split(",");
-        if (wl.length <= 0) return "White list must be not empty.";
+        if (wl.length <= 0)
+            return "White list must be not empty.";
         else {
-            for (String t: wl) {
+            for (String t : wl) {
                 this.whiteList.add(t.trim());
             }
         }
 
         if (config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
-                config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+            config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
             this.taskDivideStrategy = new DivideTaskByQueue();
         } else {
             this.taskDivideStrategy = new DivideTaskByTopic();
@@ -101,7 +128,9 @@ public class RmqSourceReplicator extends SourceConnector {
         return "";
     }
 
+    @Override
     public void start() {
+        startMQAdminTools();
     }
 
     public void stop() {
@@ -116,65 +145,49 @@ public class RmqSourceReplicator extends SourceConnector {
     }
 
     public Class<? extends Task> taskClass() {
-      
+
         return RmqSourceTask.class;
     }
 
     public List<KeyValue> taskConfigs() {
-      
-        if (configValid) {
-
-            boolean adminStarted = false;
-            RPCHook rpcHook = null;
-            DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
-            defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
-            defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
-            defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
-
-            try {
-                defaultMQAdminExt.start();
-                adminStarted = true;
-            } catch (MQClientException e) {
-                log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
-            }
+        if (!configValid) {
+            return new ArrayList<KeyValue>();
+        }
+
+        startMQAdminTools();
+
+        try {
+            for (String topic : this.whiteList) {
+                if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
+                    (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
+                    !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
 
-            if (adminStarted) {
-                try {
-                    for (String topic : this.whiteList) {
-                        if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
-                                (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
-                                !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
-
-                            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-                            if (!topicRouteMap.containsKey(topic)) {
-                                topicRouteMap.put(topic, new ArrayList<MessageQueue>());
-                            }
-                            for (QueueData qd : topicRouteData.getQueueDatas()) {
-                                for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
-                                    topicRouteMap.get(topic).add(mq);
-                                }
-                            }
+                    TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                    if (!topicRouteMap.containsKey(topic)) {
+                        topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+                    }
+                    for (QueueData qd : topicRouteData.getQueueDatas()) {
+                        for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+                            topicRouteMap.get(topic).add(mq);
                         }
                     }
-                } catch (Exception e) {
-                    log.error("Fetch topic list error.", e);
-                } finally {
-                    defaultMQAdminExt.shutdown();
                 }
             }
-
-            TaskDivideConfig tdc = new TaskDivideConfig(
-                    this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
-                    this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
-                    this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
-                    DataType.COMMON_MESSAGE.ordinal(),
-                    this.taskParallelism
-            );
-            return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
-        } else {
-            return new ArrayList<KeyValue>();
+        } catch (Exception e) {
+            log.error("Fetch topic list error.", e);
+        } finally {
+            defaultMQAdminExt.shutdown();
         }
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
+            this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
+            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+            DataType.COMMON_MESSAGE.ordinal(),
+            this.taskParallelism
+        );
+        return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
     }
 }
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 50a88bb..8c8e434 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -122,6 +122,7 @@ public class RmqSourceTask extends SourceTask {
         } catch (Exception e) {
             log.error("Consumer of task {} start failed.", this.taskId, e);
         }
+        log.info("RocketMQ source task started");
     }
 
     public void stop() {
@@ -185,7 +186,9 @@ public class RmqSourceTask extends SourceTask {
                 log.error("Rocketmq replicator task poll error, current config: {}", JSON.toJSONString(config), e);
             }
         } else {
-            log.warn("Rocketmq replicator task is not started.");
+            if (System.currentTimeMillis() % 1000 == 0) {
+                log.warn("Rocketmq replicator task is not started.");
+            }
         }
         return res;
     }
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 0e4766d..6888038 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -23,15 +23,15 @@ import java.util.List;
 public class Utils {
 
     public static String createGroupName(String prefix) {
-        return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
     }
 
     public static String createGroupName(String prefix, String postfix) {
-        return new StringBuilder().append(prefix).append("@").append(postfix).toString();
+        return new StringBuilder().append(prefix).append("-").append(postfix).toString();
     }
 
     public static String createTaskId(String prefix) {
-        return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
     }
 
     public static String createInstanceName(String namesrvAddr) {