You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:17 UTC
[rocketmq-connect] 07/39: [ISSUE 368]Polish rocketmq replicator implementation (#366)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 40b9fc8d9c4e6b04f06626948c1197be33c60e5b
Author: Heng Du <du...@apache.org>
AuthorDate: Tue Aug 13 14:26:40 2019 +0800
[ISSUE 368]Polish rocketmq replicator implementation (#366)
* Polish rocketmq replicator implementation
* Add documation of rocketmq-replicator
---
README.md | 18 ++-
package.xml | 41 +++++++
pom.xml | 30 ++++-
.../rocketmq/replicator/RmqSourceReplicator.java | 133 +++++++++++----------
.../apache/rocketmq/replicator/RmqSourceTask.java | 5 +-
.../apache/rocketmq/replicator/common/Utils.java | 6 +-
6 files changed, 167 insertions(+), 66 deletions(-)
diff --git a/README.md b/README.md
index 77bcb94..b2c98d1 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,22 @@
# rocketmq-replicator
-# boot-parameter
+## rocketmq-replicator打包
+````
+mvn clean install -Prelease-all -DskipTest -U
+````
+## rocketmq-replicator启动
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}
+?config={"connector-class":"org.apache.rocketmq.connector.RmqSourceConnector","source-rocketmq":"xxxx:9876","target-rocketmq":"xxxxxxx:9876","replicator-store-topic":"replicatorTopic","taskDivideStrategy":"0","white-list”:"TopicTest,TopicTest2","task-parallelism":"2","source-record-converter":"org.apache.rocketmq.connect.runtime.converter.JsonConverter"}
+````
+
+
+## rocketmq-replicator停止
+````
+http://${runtime-ip}:${runtime-port}/connectors/${rocketmq-replicator-name}/stop
+````
+
+## rocketmq-replicator参数说明
parameter | type | must | description | sample value
---|---|---|---|---|
diff --git a/package.xml b/package.xml
new file mode 100644
index 0000000..5dc18b9
--- /dev/null
+++ b/package.xml
@@ -0,0 +1,41 @@
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
+ http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+ <!-- Assembles a packaged version targeting OS installation. -->
+ <id>package</id>
+ <formats>
+ <format>dir</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <fileSets>
+ <fileSet>
+ <directory>${project.basedir}</directory>
+ <includes>
+ <include>README*</include>
+ <include>LICENSE*</include>
+ <include>NOTICE*</include>
+ <include>licenses/</include>
+ <include>notices/</include>
+ </includes>
+ </fileSet>
+ <fileSet>
+ <directory>${project.basedir}/conf</directory>
+ <outputDirectory>conf/</outputDirectory>
+ <includes>
+ <include>*</include>
+ </includes>
+ </fileSet>
+ </fileSets>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>lib/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <useTransitiveFiltering>true</useTransitiveFiltering>
+ <excludes>
+ <exclude>io.openmessaging:openmessaging-connector</exclude>
+ <exclude>com.alibaba:fastjson</exclude>
+ </excludes>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/pom.xml b/pom.xml
index d283ca9..963222e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -74,8 +74,36 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
- <version>1.2.51</version>
+ <version>1.2.58</version>
</dependency>
</dependencies>
+ <profiles>
+ <profile>
+ <id>release-all</id>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <descriptors>
+ <descriptor>package.xml</descriptor>
+ </descriptors>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ <finalName>rocketmq-replicator-${project.version}</finalName>
+ </build>
+ </profile>
+ </profiles>
+
</project>
\ No newline at end of file
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index c1f350f..12473ab 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -19,11 +19,18 @@ package org.apache.rocketmq.replicator;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.replicator.common.ConstDefine;
import org.apache.rocketmq.replicator.common.Utils;
import org.apache.rocketmq.replicator.config.ConfigDefine;
@@ -33,13 +40,10 @@ import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
import org.apache.rocketmq.replicator.strategy.TaskDivideStrategy;
-import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
-
public class RmqSourceReplicator extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(RmqSourceReplicator.class);
@@ -60,33 +64,56 @@ public class RmqSourceReplicator extends SourceConnector {
private int taskParallelism = 1;
- public RmqSourceReplicator() {
+ private DefaultMQAdminExt defaultMQAdminExt;
+
+ private volatile boolean adminStarted;
+ public RmqSourceReplicator() {
topicRouteMap = new HashMap<String, List<MessageQueue>>();
whiteList = new HashSet<String>();
}
+ private synchronized void startMQAdminTools() {
+ if (!configValid || adminStarted) {
+ return;
+ }
+ RPCHook rpcHook = null;
+ this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
+ this.defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ this.defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
+ try {
+ defaultMQAdminExt.start();
+ log.info("RocketMQ defaultMQAdminExt started");
+ } catch (MQClientException e) {
+ log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+ }
+ adminStarted = true;
+ }
+
+ @Override
public String verifyAndSetConfig(KeyValue config) {
- // check the need key.
- for(String requestKey : ConfigDefine.REQUEST_CONFIG){
- if(!config.containsKey(requestKey)){
+ // Check the need key.
+ for (String requestKey : ConfigDefine.REQUEST_CONFIG) {
+ if (!config.containsKey(requestKey)) {
return "Request config key: " + requestKey;
}
}
- // check the whitelist, whitelist is required.
+ // Check the whitelist, whitelist is required.
String whileListStr = config.getString(ConfigDefine.CONN_WHITE_LIST);
String[] wl = whileListStr.trim().split(",");
- if (wl.length <= 0) return "White list must be not empty.";
+ if (wl.length <= 0)
+ return "White list must be not empty.";
else {
- for (String t: wl) {
+ for (String t : wl) {
this.whiteList.add(t.trim());
}
}
if (config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
- config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+ config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
this.taskDivideStrategy = new DivideTaskByQueue();
} else {
this.taskDivideStrategy = new DivideTaskByTopic();
@@ -101,7 +128,9 @@ public class RmqSourceReplicator extends SourceConnector {
return "";
}
+ @Override
public void start() {
+ startMQAdminTools();
}
public void stop() {
@@ -116,65 +145,49 @@ public class RmqSourceReplicator extends SourceConnector {
}
public Class<? extends Task> taskClass() {
-
+
return RmqSourceTask.class;
}
public List<KeyValue> taskConfigs() {
-
- if (configValid) {
-
- boolean adminStarted = false;
- RPCHook rpcHook = null;
- DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
- defaultMQAdminExt.setNamesrvAddr(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ));
- defaultMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
- defaultMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ)));
-
- try {
- defaultMQAdminExt.start();
- adminStarted = true;
- } catch (MQClientException e) {
- log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
- }
+ if (!configValid) {
+ return new ArrayList<KeyValue>();
+ }
+
+ startMQAdminTools();
+
+ try {
+ for (String topic : this.whiteList) {
+ if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
+ (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
+ !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
- if (adminStarted) {
- try {
- for (String topic : this.whiteList) {
- if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
- (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
- !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
-
- TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new ArrayList<MessageQueue>());
- }
- for (QueueData qd : topicRouteData.getQueueDatas()) {
- for (int i = 0; i < qd.getReadQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- topicRouteMap.get(topic).add(mq);
- }
- }
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ topicRouteMap.get(topic).add(mq);
}
}
- } catch (Exception e) {
- log.error("Fetch topic list error.", e);
- } finally {
- defaultMQAdminExt.shutdown();
}
}
-
- TaskDivideConfig tdc = new TaskDivideConfig(
- this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
- this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
- this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
- DataType.COMMON_MESSAGE.ordinal(),
- this.taskParallelism
- );
- return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
- } else {
- return new ArrayList<KeyValue>();
+ } catch (Exception e) {
+ log.error("Fetch topic list error.", e);
+ } finally {
+ defaultMQAdminExt.shutdown();
}
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
+ this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
+ this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.taskParallelism
+ );
+ return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
}
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 50a88bb..8c8e434 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -122,6 +122,7 @@ public class RmqSourceTask extends SourceTask {
} catch (Exception e) {
log.error("Consumer of task {} start failed.", this.taskId, e);
}
+ log.info("RocketMQ source task started");
}
public void stop() {
@@ -185,7 +186,9 @@ public class RmqSourceTask extends SourceTask {
log.error("Rocketmq replicator task poll error, current config: {}", JSON.toJSONString(config), e);
}
} else {
- log.warn("Rocketmq replicator task is not started.");
+ if (System.currentTimeMillis() % 1000 == 0) {
+ log.warn("Rocketmq replicator task is not started.");
+ }
}
return res;
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 0e4766d..6888038 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -23,15 +23,15 @@ import java.util.List;
public class Utils {
public static String createGroupName(String prefix) {
- return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
}
public static String createGroupName(String prefix, String postfix) {
- return new StringBuilder().append(prefix).append("@").append(postfix).toString();
+ return new StringBuilder().append(prefix).append("-").append(postfix).toString();
}
public static String createTaskId(String prefix) {
- return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
}
public static String createInstanceName(String namesrvAddr) {