You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:23 UTC
[rocketmq-connect] 13/39: add DivideTaskByQueue. resolve #397
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 90541a271a54f6e392ce146cb39666c2466a5375
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Fri Sep 20 22:25:26 2019 +0800
add DivideTaskByQueue. resolve #397
---
.../rocketmq/replicator/config/TaskTopicInfo.java | 2 +-
.../replicator/strategy/DivideTaskByQueue.java | 25 ++++++++++++++++++++--
2 files changed, 24 insertions(+), 3 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index 1086295..3e2962b 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.replicator.config;
import org.apache.rocketmq.common.message.MessageQueue;
-public class TaskTopicInfo extends MessageQueue{
+public class TaskTopicInfo extends MessageQueue {
private String targetTopic;
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index d6a15ad..d909873 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -16,8 +16,13 @@
*/
package org.apache.rocketmq.replicator.strategy;
+import com.alibaba.fastjson.JSONObject;
import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.HashMap;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import java.util.ArrayList;
import java.util.List;
@@ -28,13 +33,29 @@ public class DivideTaskByQueue extends TaskDivideStrategy {
public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
-
+ int parallelism = tdc.getTaskParallelism();
+ Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
+ int id = -1;
for (String t : topicRouteMap.keySet()) {
for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
+ int ind = ++id % parallelism;
+ if (!queueTopicList.containsKey(ind)) {
+ queueTopicList.put(ind, new ArrayList<TaskTopicInfo>());
+ }
+ queueTopicList.get(ind).add(taskTopicInfo);
}
}
- return config;
+ for (int i = 0; i < parallelism; i++) {
+ KeyValue keyValue = new DefaultKeyValue();
+ keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+ keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+ keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
+ keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+ config.add(keyValue);
+ }
+ return config;
}
}