You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:13 UTC
[rocketmq-connect] 03/39: Define and Implement the RmqConnector and RmqSourceTask. (#343)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 6ddd7e447b529185970c388541399d4b5a576bb2
Author: chuenfaiy <ch...@163.com>
AuthorDate: Fri Jul 26 21:29:10 2019 +0800
Define and Implement the RmqConnector and RmqSourceTask. (#343)
* [Fix]fix some bugs in connectors and fastjson.
* [Update]update the version of replicator
* [Update]remove the repetitive properties
* [Update]Add the license at the head of TaskTopicInfo class and TaskConfigEnum class
---
.gitignore | 1 -
README.md | 13 ++
pom.xml | 12 +-
.../rocketmq/connector/RmqSourceConnector.java | 131 +++++++++++++++------
.../apache/rocketmq/connector/RmqSourceTask.java | 120 +++++++++++--------
.../common/{Utils.java => ConstDefine.java} | 10 +-
.../apache/rocketmq/connector/common/Utils.java | 8 +-
.../rocketmq/connector/config/ConfigDefine.java | 34 ++++--
.../rocketmq/connector/config/TaskConfig.java | 45 +++----
.../Utils.java => config/TaskConfigEnum.java} | 25 +++-
.../{TaskConfig.java => TaskDivideConfig.java} | 75 ++++++------
.../config/{TaskConfig.java => TaskTopicInfo.java} | 44 +------
.../connector/strategy/DivideTaskByQueue.java | 12 +-
.../connector/strategy/DivideTaskByTopic.java | 30 +++--
.../connector/strategy/TaskDivideStrategy.java | 3 +-
15 files changed, 307 insertions(+), 256 deletions(-)
diff --git a/.gitignore b/.gitignore
index 36716aa..525eaaa 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,3 @@
-*.iml
*.class
*.jar
*dependency-reduced-pom.xml
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..0b8bef7
--- /dev/null
+++ b/README.md
@@ -0,0 +1,13 @@
+# rocketmq-replicator
+
+# 启动参数选择
+
+参数 | 类型 |是否必须 |说明|示例值
+---|---|---|---|---|
+source-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
+target-rocketmq | 字符串 | 是 | 源rocketmq集群namesrv地址 | 192.168.1.2:9876 |
+replicator-store-topic | 字符串 | 是 | replicator存储topic,需要在runtime的mq集群提前创建 | replicator-store-topic |
+task-divide-strategy | 整型 | 否 | 任务切割策略,可以按照主题和队列来切割,目前只支持主题切割且主题对应值为0 | 0 |
+white-list | 字符串 | 是 | 复制主题白名单,多个topic之间使用逗号分隔 | topic-1,topic-2 |
+task-parallelism | 整型 | 否 | 任务并行度,默认值为1,当topic数大于task数时,一个task将负责多个topic | 2 |
+source-record-converter | 字符串 | 是 | 源数据解析器,目前使用的是Json解析器 | io.openmessaging.connect.runtime.converter.JsonConverter |
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 13a5af4..37bf78f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -23,7 +23,7 @@
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-connector</artifactId>
- <version>1.0-SNAPSHOT</version>
+ <version>0.0.1-SNAPSHOT</version>
<properties>
<rocketmq.version>4.4.0</rocketmq.version>
@@ -72,16 +72,6 @@
<version>0.9.6</version>
</dependency>
<dependency>
- <groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-connect</artifactId>
- <version>0.1.0-beta</version>
- </dependency>
- <dependency>
- <groupId>io.openmessaging</groupId>
- <artifactId>openmessaging-connector</artifactId>
- <version>0.1.0-beta</version>
- </dependency>
- <dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
index c919fa3..2b5bda4 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceConnector.java
@@ -19,13 +19,16 @@ package org.apache.rocketmq.connector;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
+import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.connector.common.ConstDefine;
+import org.apache.rocketmq.connector.common.Utils;
import org.apache.rocketmq.connector.config.ConfigDefine;
+import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.TaskDivideConfig;
import org.apache.rocketmq.connector.strategy.DivideStrategyEnum;
import org.apache.rocketmq.connector.strategy.DivideTaskByQueue;
import org.apache.rocketmq.connector.strategy.DivideTaskByTopic;
@@ -35,48 +38,96 @@ import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
public class RmqSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(RmqSourceConnector.class);
+ private boolean syncDLQ = false;
+
+ private boolean syncRETRY = false;
+
private KeyValue config;
private Map<String, List<MessageQueue>> topicRouteMap;
- private final TaskDivideStrategy taskDivideStrategy;
+ private TaskDivideStrategy taskDivideStrategy;
+
+ private Set<String> whiteList;
+
+ private volatile boolean started = false;
+
+ private volatile boolean configValid = false;
+
+ private DefaultMQAdminExt defaultMQAdminExt;
+
+ private int taskParallelism = 1;
public RmqSourceConnector() {
topicRouteMap = new HashMap<String, List<MessageQueue>>();
-
- if (this.config.getInt(ConfigDefine.TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_TOPIC.ordinal()) {
- taskDivideStrategy = new DivideTaskByTopic();
- } else {
- taskDivideStrategy = new DivideTaskByQueue();
- }
+ whiteList = new HashSet<String>();
}
public String verifyAndSetConfig(KeyValue config) {
+ // check the need key.
for(String requestKey : ConfigDefine.REQUEST_CONFIG){
if(!config.containsKey(requestKey)){
return "Request config key: " + requestKey;
}
}
+
+ // check the whitelist, whitelist is required.
+ String whileListStr = this.config.getString(ConfigDefine.CONN_WHITE_LIST);
+ String[] wl = whileListStr.trim().split(",");
+ if (wl.length <= 0) return "White list must be not empty.";
+ else {
+ for (String t: wl) {
+ this.whiteList.add(t.trim());
+ }
+ }
+
+ if (this.config.containsKey(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) &&
+ this.config.getInt(ConfigDefine.CONN_TASK_DIVIDE_STRATEGY) == DivideStrategyEnum.BY_QUEUE.ordinal()) {
+ this.taskDivideStrategy = new DivideTaskByQueue();
+ } else {
+ this.taskDivideStrategy = new DivideTaskByTopic();
+ }
+
+ if (config.containsKey(ConfigDefine.CONN_TASK_PARALLELISM)) {
+ this.taskParallelism = this.config.getInt(ConfigDefine.CONN_TASK_PARALLELISM);
+ }
+
this.config = config;
+ this.configValid = true;
return "";
}
public void start() {
+
+ if (configValid) {
+ RPCHook rpcHook = null;
+ this.defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ this.defaultMQAdminExt.setNamesrvAddr(this.config.getString(ConfigDefine.CONN_SOURCE_RMQ));
+ this.defaultMQAdminExt.setInstanceName(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ try {
+ defaultMQAdminExt.start();
+ } catch (MQClientException e) {
+ log.error("Replicator start failed for `defaultMQAdminExt` exception.", e);
+ }
+ started = true;
+ }
}
public void stop() {
-
+ if (started) {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ started = false;
+ }
}
public void pause() {
@@ -88,26 +139,24 @@ public class RmqSourceConnector extends SourceConnector {
}
public Class<? extends Task> taskClass() {
- return null;
+
+ return RmqSourceTask.class;
}
public List<KeyValue> taskConfigs() {
-
- System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, this.config.getString(ConfigDefine.SOURCE_RMQ));
- RPCHook rpcHook = null;
- DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
- defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
- try {
- defaultMQAdminExt.start();
- TopicList topicList = defaultMQAdminExt.fetchAllTopicList();
- for (String topic: topicList.getTopicList()) {
- if (!topic.equals(ConfigDefine.STORE_TOPIC)) {
- TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new ArrayList<MessageQueue>());
- }
- for (QueueData qd: topicRouteData.getQueueDatas()) {
- if (PermName.isReadable(qd.getPerm())) {
+
+ if (started && configValid) {
+ try {
+ for (String topic : this.whiteList) {
+ if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
+ (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
+ !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
+
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
topicRouteMap.get(topic).add(mq);
@@ -115,15 +164,21 @@ public class RmqSourceConnector extends SourceConnector {
}
}
}
+ } catch (Exception e) {
+ log.error("Fetch topic list error.", e);
}
- } catch (Exception e) {
- log.error("fetch topic list error: ", e);
- } finally {
- defaultMQAdminExt.shutdown();
- }
- return this.taskDivideStrategy.divide(this.topicRouteMap,
- this.config.getString(ConfigDefine.SOURCE_RMQ), this.config.getString(ConfigDefine.STORE_TOPIC));
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.config.getString(ConfigDefine.CONN_SOURCE_RMQ),
+ this.config.getString(ConfigDefine.CONN_STORE_TOPIC),
+ this.config.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.taskParallelism
+ );
+ return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+ } else {
+ return new ArrayList<KeyValue>();
+ }
}
}
diff --git a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
index d71dc87..4fca698 100644
--- a/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/connector/RmqSourceTask.java
@@ -29,6 +29,7 @@ import org.apache.rocketmq.connector.common.Utils;
import org.apache.rocketmq.connector.config.ConfigUtil;
import org.apache.rocketmq.connector.config.DataType;
import org.apache.rocketmq.connector.config.TaskConfig;
+import org.apache.rocketmq.connector.config.TaskTopicInfo;
import org.apache.rocketmq.connector.schema.FieldName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,6 +44,7 @@ public class RmqSourceTask extends SourceTask {
private final String taskId;
private final TaskConfig config;
private final DefaultMQPullConsumer consumer;
+ private volatile boolean started = false;
private Map<MessageQueue, Long> mqOffsetMap;
public RmqSourceTask() {
@@ -67,50 +69,68 @@ public class RmqSourceTask extends SourceTask {
public void start(KeyValue config) {
ConfigUtil.load(config, this.config);
- this.consumer.setConsumerGroup(Utils.createGroupName(this.config.getSourceTopic()));
+ this.consumer.setConsumerGroup(this.taskId);
this.consumer.setNamesrvAddr(this.config.getSourceRocketmq());
+ List<TaskTopicInfo> topicList = JSONObject.parseArray(this.config.getTaskTopicList(), TaskTopicInfo.class);
+
try {
this.consumer.start();
- Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(this.config.getSourceTopic());
- if (!this.config.getQueueId().equals("")) {
- for (MessageQueue mq: mqs) {
- if (Integer.valueOf(this.config.getQueueId()) == mq.getQueueId()) {
+ for (TaskTopicInfo tti: topicList) {
+ Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(tti.getSourceTopic());
+ if (!tti.getQueueId().equals("")) {
+ // divide task by queue
+ for (MessageQueue mq: mqs) {
+ if (Integer.valueOf(tti.getQueueId()) == mq.getQueueId()) {
+ ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ mq.getTopic(),
+ mq.getBrokerName(),
+ String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
+
+ if (null != positionInfo && positionInfo.array().length > 0) {
+ String positionJson = new String(positionInfo.array(), "UTF-8");
+ JSONObject jsonObject = JSONObject.parseObject(positionJson);
+ this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+ } else {
+ this.config.setNextPosition(0L);
+ }
+ mqOffsetMap.put(mq, this.config.getNextPosition());
+ }
+ }
+ } else {
+ // divide task by topic
+ for (MessageQueue mq: mqs) {
ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
- ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
- mq.getBrokerName(), String.valueOf(mq.getQueueId()))
- .getBytes("UTF-8")));
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ mq.getTopic(),
+ mq.getBrokerName(),
+ String.valueOf(mq.getQueueId())).getBytes("UTF-8")));
if (null != positionInfo && positionInfo.array().length > 0) {
String positionJson = new String(positionInfo.array(), "UTF-8");
JSONObject jsonObject = JSONObject.parseObject(positionJson);
this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
+ } else {
+ this.config.setNextPosition(0L);
}
mqOffsetMap.put(mq, this.config.getNextPosition());
}
}
- } else {
- for (MessageQueue mq: mqs) {
- ByteBuffer positionInfo = this.context.positionStorageReader().getPosition(
- ByteBuffer.wrap(RmqConstants.getPartition(mq.getTopic(),
- mq.getBrokerName(), String.valueOf(mq.getQueueId()))
- .getBytes("UTF-8")));
-
- if (null != positionInfo && positionInfo.array().length > 0) {
- String positionJson = new String(positionInfo.array(), "UTF-8");
- JSONObject jsonObject = JSONObject.parseObject(positionJson);
- this.config.setNextPosition(jsonObject.getLong(RmqConstants.NEXT_POSITION));
- }
- mqOffsetMap.put(mq, this.config.getNextPosition());
- }
}
-
+ started = true;
} catch (Exception e) {
- log.error("consumer of task {} start failed. ", this.taskId, e);
+ log.error("Consumer of task {} start failed.", this.taskId, e);
}
}
public void stop() {
- this.consumer.shutdown();
+
+ if (started) {
+ if (this.consumer != null) {
+ this.consumer.shutdown();
+ }
+ started = false;
+ }
}
public void pause() {
@@ -124,46 +144,48 @@ public class RmqSourceTask extends SourceTask {
private Collection<SourceDataEntry> pollCommonMessage() {
List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
-
- try {
- for (MessageQueue mq : this.mqOffsetMap.keySet()) {
- PullResult pullResult = consumer.pull(mq, "*", this.mqOffsetMap.get(mq), 32);
- switch (pullResult.getPullStatus()) {
- case FOUND: {
- this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
- JSONObject jsonObject = new JSONObject();
- jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
-
- List<MessageExt> msgs = pullResult.getMsgFoundList();
- for (MessageExt m : msgs) {
+ if (started) {
+ try {
+ for (MessageQueue mq : this.mqOffsetMap.keySet()) {
+ PullResult pullResult = consumer.pull(mq, "*",
+ this.mqOffsetMap.get(mq), 32);
+ switch (pullResult.getPullStatus()) {
+ case FOUND: {
+ this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
+ List<MessageExt> msgs = pullResult.getMsgFoundList();
Schema schema = new Schema();
schema.setDataSource(this.config.getSourceRocketmq());
- schema.setName(this.config.getSourceTopic());
+ schema.setName(mq.getTopic());
schema.setFields(new ArrayList<Field>());
- schema.getFields().add(new Field(0, FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
+ schema.getFields().add(new Field(0,
+ FieldName.COMMON_MESSAGE.getKey(), FieldType.STRING));
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis())
.queue(this.config.getStoreTopic()).entryType(EntryType.CREATE);
- dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(m));
+ dataEntryBuilder.putFiled(FieldName.COMMON_MESSAGE.getKey(), JSONObject.toJSONString(msgs));
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
- ByteBuffer.wrap(RmqConstants.getPartition(this.config.getSourceTopic(),
- this.config.getBrokerName(),
- this.config.getQueueId()).getBytes("UTF-8")),
+ ByteBuffer.wrap(RmqConstants.getPartition(
+ mq.getTopic(),
+ mq.getBrokerName(),
+ String.valueOf(mq.getQueueId())).getBytes("UTF-8")),
ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
);
res.add(sourceDataEntry);
+ break;
}
- break;
+ default:
+ break;
}
- default:
- break;
}
+ } catch (Exception e) {
+ log.error("Rocketmq replicator task poll error, current config: {}", JSON.toJSONString(config), e);
}
- } catch (Exception e) {
- log.error("Rocketmq connector task poll error, current config: {}", JSON.toJSONString(config), e);
+ } else {
+ log.warn("Rocketmq replicator task is not started.");
}
-
return res;
}
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
similarity index 70%
copy from src/main/java/org/apache/rocketmq/connector/common/Utils.java
copy to src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
index 71f538c..6e1eeac 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/common/ConstDefine.java
@@ -16,13 +16,9 @@
*/
package org.apache.rocketmq.connector.common;
-public class Utils {
+public class ConstDefine {
- public static String createGroupName(String prefix) {
- return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
- }
+ public static String TASK_GROUP_NAME_PREFIX = "REPLICATOR-TASK";
- public static String createTaskId(String prefix) {
- return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
- }
+ public static String REPLICATOR_ADMIN_PREFIX = "REPLICATOR-ADMIN";
}
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
index 71f538c..f5822c6 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/common/Utils.java
@@ -19,10 +19,14 @@ package org.apache.rocketmq.connector.common;
public class Utils {
public static String createGroupName(String prefix) {
- return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+ return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
+ }
+
+ public static String createGroupName(String prefix, String postfix) {
+ return new StringBuilder().append(prefix).append("@").append(postfix).toString();
}
public static String createTaskId(String prefix) {
- return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+ return new StringBuilder().append(prefix).append("@").append(System.currentTimeMillis()).toString();
}
}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
index 64e9c94..13f8960 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/ConfigDefine.java
@@ -21,28 +21,38 @@ import java.util.Set;
public class ConfigDefine {
- public static final String SOURCE_RMQ = "sourceRocketmq";
+ public static final String CONN_SOURCE_RMQ = "source-rocketmq";
- public static final String STORE_TOPIC = "storeTopic";
+ public static final String CONN_STORE_TOPIC = "replicator-store-topic";
- public static final String TARGET_RMQ = "targetRocketmq";
+ public static final String CONN_TARGET_RMQ = "target-rocketmq";
- public static final String DATA_TYPE = "dataType";
+ public static final String CONN_SOURCE_GROUP = "source-group";
- public static final String QUEUE_ID = "queueId";
+ public static final String CONN_DATA_TYPE = "data-type";
- public static final String TASK_DIVIDE_STRATEGY = "taskDivideStrategy";
+ public static final String CONN_TASK_DIVIDE_STRATEGY = "task-divide-strategy";
- public static final String BROKER_NAME = "brokerName";
+ public static final String CONN_BROKER_NAME = "broker-name";
- public static final String SOURCE_TOPIC = "sourceTopic";
+ public static final String CONN_SOURCE_TOPIC = "source-topic";
+ public static final String CONN_WHITE_LIST = "white-list";
+
+ public static final String CONN_SOURCE_RECORD_CONVERTER = "source-record-converter";
+
+ public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+
+ /**
+ * The required key for all configurations.
+ */
public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
{
- add("sourceRocketmq");
- add("targetRocketmq");
- add("storeTopic");
- add("taskDivideStrategy");
+ add(CONN_SOURCE_RMQ);
+ add(CONN_TARGET_RMQ);
+ add(CONN_STORE_TOPIC);
+ add(CONN_WHITE_LIST);
+ add(CONN_SOURCE_RECORD_CONVERTER);
}
};
}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
index 7bba3fb..d480b90 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
@@ -19,27 +19,26 @@ package org.apache.rocketmq.connector.config;
public class TaskConfig {
private String storeTopic;
- private String sourceTopic;
+ private String sourceGroup;
private String sourceRocketmq;
private Integer dataType;
- private String brokerName;
- private String queueId;
private Long nextPosition;
+ private String taskTopicList;
- public String getStoreTopic() {
- return storeTopic;
+ public String getSourceGroup() {
+ return sourceGroup;
}
- public void setStoreTopic(String storeTopic) {
- this.storeTopic = storeTopic;
+ public void setSourceGroup(String sourceGroup) {
+ this.sourceGroup = sourceGroup;
}
- public String getSourceTopic() {
- return sourceTopic;
+ public String getStoreTopic() {
+ return storeTopic;
}
- public void setSourceTopic(String sourceTopic) {
- this.sourceTopic = sourceTopic;
+ public void setStoreTopic(String storeTopic) {
+ this.storeTopic = storeTopic;
}
public String getSourceRocketmq() {
@@ -62,22 +61,6 @@ public class TaskConfig {
this.dataType = dataType;
}
- public String getBrokerName() {
- return brokerName;
- }
-
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
- }
-
- public String getQueueId() {
- return queueId;
- }
-
- public void setQueueId(String queueId) {
- this.queueId = queueId;
- }
-
public Long getNextPosition() {
return nextPosition;
}
@@ -85,4 +68,12 @@ public class TaskConfig {
public void setNextPosition(Long nextPosition) {
this.nextPosition = nextPosition;
}
+
+ public String getTaskTopicList() {
+ return taskTopicList;
+ }
+
+ public void setTaskTopicList(String taskTopicList) {
+ this.taskTopicList = taskTopicList;
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/connector/common/Utils.java b/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
similarity index 59%
copy from src/main/java/org/apache/rocketmq/connector/common/Utils.java
copy to src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
index 71f538c..d8aa08a 100644
--- a/src/main/java/org/apache/rocketmq/connector/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskConfigEnum.java
@@ -14,15 +14,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.connector.common;
+package org.apache.rocketmq.connector.config;
-public class Utils {
+public enum TaskConfigEnum {
- public static String createGroupName(String prefix) {
- return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+ TASK_ID("taskId"),
+ TASK_SOURCE_GROUP("sourceGroup"),
+ TASK_SOURCE_ROCKETMQ("sourceRocketmq"),
+ TASK_SOURCE_TOPIC("sourceTopic"),
+ TASK_STORE_ROCKETMQ("storeTopic"),
+ TASK_DATA_TYPE("dataType"),
+ TASK_BROKER_NAME("brokerName"),
+ TASK_QUEUE_ID("queueId"),
+ TASK_NEXT_POSITION("nextPosition"),
+ TASK_TOPIC_INFO("taskTopicList");
+
+ private String key;
+
+ TaskConfigEnum(String key) {
+ this.key = key;
}
- public static String createTaskId(String prefix) {
- return new StringBuilder().append(prefix).append("_").append(System.currentTimeMillis()).toString();
+ public String getKey() {
+ return key;
}
}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
similarity index 53%
copy from src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
copy to src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
index 7bba3fb..7f904b3 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskDivideConfig.java
@@ -16,73 +16,64 @@
*/
package org.apache.rocketmq.connector.config;
-public class TaskConfig {
+public class TaskDivideConfig {
+
+ private String sourceNamesrvAddr;
private String storeTopic;
- private String sourceTopic;
- private String sourceRocketmq;
- private Integer dataType;
- private String brokerName;
- private String queueId;
- private Long nextPosition;
- public String getStoreTopic() {
- return storeTopic;
- }
+ private String srcRecordConverter;
- public void setStoreTopic(String storeTopic) {
- this.storeTopic = storeTopic;
- }
+ private int dataType;
- public String getSourceTopic() {
- return sourceTopic;
- }
+ private int taskParallelism;
- public void setSourceTopic(String sourceTopic) {
- this.sourceTopic = sourceTopic;
+ public TaskDivideConfig(String sourceNamesrvAddr, String storeTopic, String srcRecordConverter,
+ int dataType, int taskParallelism) {
+ this.sourceNamesrvAddr = sourceNamesrvAddr;
+ this.storeTopic = storeTopic;
+ this.srcRecordConverter = srcRecordConverter;
+ this.dataType = dataType;
+ this.taskParallelism = taskParallelism;
}
- public String getSourceRocketmq() {
- return sourceRocketmq;
+ public String getSourceNamesrvAddr() {
+ return sourceNamesrvAddr;
}
- public void setSourceRocketmq(String sourceRocketmq) {
- this.sourceRocketmq = sourceRocketmq;
+ public void setSourceNamesrvAddr(String sourceNamesrvAddr) {
+ this.sourceNamesrvAddr = sourceNamesrvAddr;
}
- public int getDataType() {
- return dataType;
- }
-
- public void setDataType(int dataType) {
- this.dataType = dataType;
+ public String getStoreTopic() {
+ return storeTopic;
}
- public void setDataType(Integer dataType) {
- this.dataType = dataType;
+ public void setStoreTopic(String storeTopic) {
+ this.storeTopic = storeTopic;
}
- public String getBrokerName() {
- return brokerName;
+ public String getSrcRecordConverter() {
+ return srcRecordConverter;
}
- public void setBrokerName(String brokerName) {
- this.brokerName = brokerName;
+ public void setSrcRecordConverter(String srcRecordConverter) {
+ this.srcRecordConverter = srcRecordConverter;
}
- public String getQueueId() {
- return queueId;
+ public int getDataType() {
+ return dataType;
}
- public void setQueueId(String queueId) {
- this.queueId = queueId;
+ public void setDataType(int dataType) {
+ this.dataType = dataType;
}
- public Long getNextPosition() {
- return nextPosition;
+ public int getTaskParallelism() {
+ return taskParallelism;
}
- public void setNextPosition(Long nextPosition) {
- this.nextPosition = nextPosition;
+ public void setTaskParallelism(int taskParallelism) {
+ this.taskParallelism = taskParallelism;
}
}
diff --git a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java b/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
similarity index 61%
copy from src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
copy to src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
index 7bba3fb..e1f47d5 100644
--- a/src/main/java/org/apache/rocketmq/connector/config/TaskConfig.java
+++ b/src/main/java/org/apache/rocketmq/connector/config/TaskTopicInfo.java
@@ -16,22 +16,16 @@
*/
package org.apache.rocketmq.connector.config;
-public class TaskConfig {
+public class TaskTopicInfo {
- private String storeTopic;
private String sourceTopic;
- private String sourceRocketmq;
- private Integer dataType;
private String brokerName;
private String queueId;
- private Long nextPosition;
- public String getStoreTopic() {
- return storeTopic;
- }
-
- public void setStoreTopic(String storeTopic) {
- this.storeTopic = storeTopic;
+ public TaskTopicInfo(String sourceTopic, String brokerName, String queueId) {
+ this.sourceTopic = sourceTopic;
+ this.brokerName = brokerName;
+ this.queueId = queueId;
}
public String getSourceTopic() {
@@ -42,26 +36,6 @@ public class TaskConfig {
this.sourceTopic = sourceTopic;
}
- public String getSourceRocketmq() {
- return sourceRocketmq;
- }
-
- public void setSourceRocketmq(String sourceRocketmq) {
- this.sourceRocketmq = sourceRocketmq;
- }
-
- public int getDataType() {
- return dataType;
- }
-
- public void setDataType(int dataType) {
- this.dataType = dataType;
- }
-
- public void setDataType(Integer dataType) {
- this.dataType = dataType;
- }
-
public String getBrokerName() {
return brokerName;
}
@@ -77,12 +51,4 @@ public class TaskConfig {
public void setQueueId(String queueId) {
this.queueId = queueId;
}
-
- public Long getNextPosition() {
- return nextPosition;
- }
-
- public void setNextPosition(Long nextPosition) {
- this.nextPosition = nextPosition;
- }
}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
index b6eae8f..caa69f3 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByQueue.java
@@ -21,27 +21,19 @@ import io.openmessaging.internal.DefaultKeyValue;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.connector.config.ConfigDefine;
import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.TaskDivideConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class DivideTaskByQueue extends TaskDivideStrategy {
- public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+ public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
for (String t: topicRouteMap.keySet()) {
for (MessageQueue mq: topicRouteMap.get(t)) {
- KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
- keyValue.put(ConfigDefine.SOURCE_RMQ, source);
- keyValue.put(ConfigDefine.STORE_TOPIC, t);
- keyValue.put(ConfigDefine.BROKER_NAME, mq.getBrokerName());
- keyValue.put(ConfigDefine.QUEUE_ID, String.valueOf(mq.getQueueId()));
- keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
- keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
- config.add(keyValue);
}
}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
index 7c96b28..53699a9 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/DivideTaskByTopic.java
@@ -16,31 +16,39 @@
*/
package org.apache.rocketmq.connector.strategy;
+import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
import io.openmessaging.internal.DefaultKeyValue;
import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.connector.config.ConfigDefine;
-import org.apache.rocketmq.connector.config.DataType;
+import org.apache.rocketmq.connector.config.*;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DivideTaskByTopic extends TaskDivideStrategy {
- public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, String source, String storeTopic) {
+ public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
-
+ int parallelism = tdc.getTaskParallelism();
+ int id = -1;
+ Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
for (String t: topicRouteMap.keySet()) {
+ int ind = ++id%parallelism;
+ if (!taskTopicList.containsKey(ind)) {
+ taskTopicList.put(ind, new ArrayList<TaskTopicInfo>());
+ }
+ taskTopicList.get(ind).add(new TaskTopicInfo(t, "", ""));
+ }
+
+ for (int i=0; i<parallelism; i++) {
KeyValue keyValue = new DefaultKeyValue();
- keyValue.put(ConfigDefine.STORE_TOPIC, storeTopic);
- keyValue.put(ConfigDefine.SOURCE_RMQ, source);
- keyValue.put(ConfigDefine.STORE_TOPIC, t);
- keyValue.put(ConfigDefine.BROKER_NAME, "");
- keyValue.put(ConfigDefine.QUEUE_ID, "");
- keyValue.put(ConfigDefine.SOURCE_TOPIC, t);
- keyValue.put(ConfigDefine.DATA_TYPE, DataType.COMMON_MESSAGE.ordinal());
+ keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+ keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+ keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(taskTopicList.get(i)));
config.add(keyValue);
}
diff --git a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
index f847cb1..e80d092 100644
--- a/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/connector/strategy/TaskDivideStrategy.java
@@ -18,11 +18,12 @@ package org.apache.rocketmq.connector.strategy;
import io.openmessaging.KeyValue;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.connector.config.TaskDivideConfig;
import java.util.List;
import java.util.Map;
public abstract class TaskDivideStrategy {
- public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, String source, String storeTopic);
+ public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, TaskDivideConfig tdc);
}