You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:32 UTC

[rocketmq-connect] 22/39: feat(replicator): support DivideTaskByConsistentHash (#443)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 2d0079795fb68050cf59d9946a597d0116fb0139
Author: xujianhai666 <52...@users.noreply.github.com>
AuthorDate: Tue Nov 19 23:25:02 2019 +0800

    feat(replicator): support DivideTaskByConsistentHash (#443)
    
    - Add assign by consistentHash pattern
    
    Closes #439
---
 ...yQueue.java => DivideTaskByConsistentHash.java} | 62 +++++++++++++++-------
 .../replicator/strategy/DivideTaskByQueue.java     |  1 -
 .../replicator/strategy/TaskDivideStrategy.java    |  1 -
 3 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
similarity index 54%
copy from src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
copy to src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
index d909873..b027246 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByConsistentHash.java
@@ -1,12 +1,12 @@
-/*
+package org.apache.rocketmq.replicator.strategy;/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,35 +14,44 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.replicator.strategy;
 
 import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import org.apache.rocketmq.common.message.MessageQueue;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
+import org.apache.rocketmq.common.consistenthash.Node;
 import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.TaskConfigEnum;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
-public class DivideTaskByQueue extends TaskDivideStrategy {
-    public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
+public class DivideTaskByConsistentHash extends TaskDivideStrategy {
+    @Override public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc) {
 
-        List<KeyValue> config = new ArrayList<KeyValue>();
+        List<KeyValue> config = new ArrayList<>();
         int parallelism = tdc.getTaskParallelism();
-        Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
+        Map<Integer, List<TaskTopicInfo>> queueTopicList = new HashMap<>();
         int id = -1;
-        for (String t : topicRouteMap.keySet()) {
-            for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
-                int ind = ++id % parallelism;
-                if (!queueTopicList.containsKey(ind)) {
-                    queueTopicList.put(ind, new ArrayList<TaskTopicInfo>());
+
+        Collection<ClientNode> cidNodes = new ArrayList<>();
+        for (int i = 0; i < parallelism; i++) {
+            cidNodes.add(new ClientNode(i, Integer.toString(i)));
+            queueTopicList.put(i, new ArrayList<>());
+        }
+
+        ConsistentHashRouter<ClientNode> router = new ConsistentHashRouter<>(cidNodes, cidNodes.size());
+
+        for (String t : topicMap.keySet()) {
+            for (TaskTopicInfo queue : topicMap.get(t)) {
+                ClientNode clientNode = router.routeNode(queue.toString());
+                if (clientNode != null) {
+                    queueTopicList.get(clientNode.index).add(queue);
                 }
-                queueTopicList.get(ind).add(taskTopicInfo);
             }
         }
 
@@ -58,4 +67,19 @@ public class DivideTaskByQueue extends TaskDivideStrategy {
 
         return config;
     }
+
+    private static class ClientNode implements Node {
+        private final String clientID;
+        private final int index;
+
+        public ClientNode(int index, String clientID) {
+            this.index = index;
+            this.clientID = clientID;
+        }
+
+        @Override
+        public String getKey() {
+            return clientID;
+        }
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index d909873..0a493de 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -20,7 +20,6 @@ import com.alibaba.fastjson.JSONObject;
 import io.openmessaging.KeyValue;
 import io.openmessaging.internal.DefaultKeyValue;
 import java.util.HashMap;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.TaskConfigEnum;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index 093d92e..6f58bb3 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -17,7 +17,6 @@
 package org.apache.rocketmq.replicator.strategy;
 
 import io.openmessaging.KeyValue;
-import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import java.util.List;
 import java.util.Map;