You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:23 UTC

[rocketmq-connect] 13/39: add DivideTaskByQueue. resolve #397

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 90541a271a54f6e392ce146cb39666c2466a5375
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Fri Sep 20 22:25:26 2019 +0800

    add DivideTaskByQueue. resolve #397
---
 .../rocketmq/replicator/config/TaskTopicInfo.java  |  2 +-
 .../replicator/strategy/DivideTaskByQueue.java     | 25 ++++++++++++++++++++--
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index 1086295..3e2962b 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -18,7 +18,7 @@ package org.apache.rocketmq.replicator.config;
 
 import org.apache.rocketmq.common.message.MessageQueue;
 
-public class TaskTopicInfo extends MessageQueue{
+public class TaskTopicInfo extends MessageQueue {
 
     private String targetTopic;
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index d6a15ad..d909873 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -16,8 +16,13 @@
  */
 package org.apache.rocketmq.replicator.strategy;
 
+import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import java.util.HashMap;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.replicator.config.DataType;
+import org.apache.rocketmq.replicator.config.TaskConfigEnum;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,13 +33,29 @@ public class DivideTaskByQueue extends TaskDivideStrategy {
     public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
-
+        int parallelism = tdc.getTaskParallelism();
+        Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
+        int id = -1;
         for (String t : topicRouteMap.keySet()) {
             for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
+                int ind = ++id % parallelism;
+                if (!queueTopicList.containsKey(ind)) {
+                    queueTopicList.put(ind, new ArrayList<TaskTopicInfo>());
+                }
+                queueTopicList.get(ind).add(taskTopicInfo);
             }
         }
 
-        return config;
+        for (int i = 0; i < parallelism; i++) {
+            KeyValue keyValue = new DefaultKeyValue();
+            keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
+            keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.COMMON_MESSAGE.ordinal());
+            keyValue.put(TaskConfigEnum.TASK_TOPIC_INFO.getKey(), JSONObject.toJSONString(queueTopicList.get(i)));
+            keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getSrcRecordConverter());
+            config.add(keyValue);
+        }
 
+        return config;
     }
 }