You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:21 UTC

[rocketmq-connect] 11/39: Topic rename. resolve #399

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 81376b9a25448472d1afd2b45088882a8851187f
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Mon Sep 9 11:31:23 2019 +0800

    Topic rename. resolve #399
---
 .../rocketmq/replicator/RmqSourceReplicator.java   | 27 ++++++++---
 .../apache/rocketmq/replicator/RmqSourceTask.java  | 14 +++---
 .../apache/rocketmq/replicator/common/Utils.java   |  6 +--
 .../rocketmq/replicator/config/ConfigDefine.java   |  4 +-
 .../rocketmq/replicator/config/TaskTopicInfo.java  | 19 +++++++-
 .../replicator/strategy/DivideTaskByQueue.java     |  7 +--
 .../replicator/strategy/DivideTaskByTopic.java     | 10 ++--
 .../replicator/strategy/TaskDivideStrategy.java    |  3 +-
 .../replicator/RmqSourceReplicatorTest.java        | 53 +++++-----------------
 9 files changed, 74 insertions(+), 69 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 6ea3ae8..f24cf63 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.text.StrSubstitutor;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -42,6 +44,7 @@ import org.apache.rocketmq.replicator.common.Utils;
 import org.apache.rocketmq.replicator.config.ConfigDefine;
 import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
 import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
 import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
@@ -60,7 +63,7 @@ public class RmqSourceReplicator extends SourceConnector {
 
     private KeyValue replicatorConfig;
 
-    private Map<String, List<MessageQueue>> topicRouteMap;
+    private Map<String, List<TaskTopicInfo>> topicRouteMap;
 
     private TaskDivideStrategy taskDivideStrategy;
 
@@ -76,7 +79,7 @@ public class RmqSourceReplicator extends SourceConnector {
     private volatile boolean adminStarted;
 
     public RmqSourceReplicator() {
-        topicRouteMap = new HashMap<String, List<MessageQueue>>();
+        topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
         whiteList = new HashSet<String>();
     }
 
@@ -204,8 +207,9 @@ public class RmqSourceReplicator extends SourceConnector {
                     for (Pattern pattern : patterns) {
                         Matcher matcher = pattern.matcher(topic);
                         if (matcher.matches()) {
+                            String targetTopic = generateTargetTopic(topic);
                             if (!targetTopicSet.contains(topic)) {
-                                ensureTargetTopic(topic, topic);
+                                ensureTargetTopic(topic, targetTopic);
                             }
 
                             // different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
@@ -219,13 +223,13 @@ public class RmqSourceReplicator extends SourceConnector {
 
                             TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
                             if (!topicRouteMap.containsKey(topic)) {
-                                topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+                                topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
                             }
                             for (QueueData qd : topicRouteData.getQueueDatas()) {
                                 if (brokerNameSet.contains(qd.getBrokerName())) {
                                     for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                                        MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
-                                        topicRouteMap.get(topic).add(mq);
+                                        TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), String.valueOf(i), targetTopic);
+                                        topicRouteMap.get(topic).add(taskTopicInfo);
                                     }
                                 }
                             }
@@ -244,7 +248,7 @@ public class RmqSourceReplicator extends SourceConnector {
         this.whiteList = whiteList;
     }
 
-    public Map<String, List<MessageQueue>> getTopicRouteMap() {
+    public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
         return this.topicRouteMap;
     }
 
@@ -283,5 +287,14 @@ public class RmqSourceReplicator extends SourceConnector {
         throw new IllegalStateException("");
     }
 
+    public String generateTargetTopic(String topic) {
+        String fmt = this.replicatorConfig.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT);
+        if (StringUtils.isNotEmpty(fmt)) {
+            Map<String, String> params = new HashMap<String, String>();
+            params.put("topic", topic);
+            return StrSubstitutor.replace(fmt, params);
+        }
+        return topic;
+    }
 }
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 9909d61..c4d53fa 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -46,12 +46,12 @@ public class RmqSourceTask extends SourceTask {
     private final DefaultMQPullConsumer consumer;
     private volatile boolean started = false;
 
-    private Map<MessageQueue, Long> mqOffsetMap;
+    private Map<TaskTopicInfo, Long> mqOffsetMap;
     public RmqSourceTask() {
         this.config = new TaskConfig();
         this.consumer = new DefaultMQPullConsumer();
         this.taskId = Utils.createTaskId(Thread.currentThread().getName());
-        mqOffsetMap = new HashMap<MessageQueue, Long>();
+        mqOffsetMap = new HashMap<TaskTopicInfo, Long>();
     }
 
     public Collection<SourceDataEntry> poll() {
@@ -95,7 +95,7 @@ public class RmqSourceTask extends SourceTask {
                             } else {
                                 this.config.setNextPosition(0L);
                             }
-                            mqOffsetMap.put(mq, this.config.getNextPosition());
+                            mqOffsetMap.put(tti, this.config.getNextPosition());
                         }
                     }
                 } else {
@@ -114,7 +114,7 @@ public class RmqSourceTask extends SourceTask {
                         } else {
                             this.config.setNextPosition(0L);
                         }
-                        mqOffsetMap.put(mq, this.config.getNextPosition());
+                        mqOffsetMap.put(tti, this.config.getNextPosition());
                     }
                 }
             }
@@ -148,12 +148,13 @@ public class RmqSourceTask extends SourceTask {
         List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
         if (started) {
             try {
-                for (MessageQueue mq : this.mqOffsetMap.keySet()) {
+                for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) {
+                    MessageQueue mq = taskTopicConfig.convertMQ();
                     PullResult pullResult = consumer.pull(mq, "*",
                             this.mqOffsetMap.get(mq), 32);
                     switch (pullResult.getPullStatus()) {
                         case FOUND: {
-                            this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
+                            this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
                             JSONObject jsonObject = new JSONObject();
                             jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
                             List<MessageExt> msgs = pullResult.getMsgFoundList();
@@ -175,6 +176,7 @@ public class RmqSourceTask extends SourceTask {
                                             String.valueOf(mq.getQueueId())).getBytes("UTF-8")),
                                     ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
                             );
+                            sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
                             res.add(sourceDataEntry);
                             break;
                         }
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index e5c0866..f3e6be6 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -58,12 +58,12 @@ public class Utils {
         return String.valueOf(namesrvList.toString().hashCode());
     }
 
-
-    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic, String cluster) throws RemotingException, MQClientException, InterruptedException {
+    public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+        String cluster) throws RemotingException, MQClientException, InterruptedException {
         List<BrokerData> brokerList = new ArrayList<BrokerData>();
 
         TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-        if (topicRouteData.getBrokerDatas() != null) { // check下
+        if (topicRouteData.getBrokerDatas() != null) {
             for (BrokerData broker : topicRouteData.getBrokerDatas()) {
                 if (StringUtils.equals(broker.getCluster(), cluster)) {
                     brokerList.add(broker);
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index ddf4972..5b15821 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -41,10 +41,12 @@ public class ConfigDefine {
 
     public static final String CONN_TASK_PARALLELISM = "task-parallelism";
 
+    public static final String CONN_TOPIC_RENAME_FMT = "topic.rename.format";
+
     /**
      * The required key for all configurations.
      */
-    public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+    public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
         {
             add(CONN_SOURCE_RMQ);
             add(CONN_TARGET_RMQ);
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index f078028..c5a39e4 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -16,16 +16,20 @@
  */
 package org.apache.rocketmq.replicator.config;
 
+import org.apache.rocketmq.common.message.MessageQueue;
+
 public class TaskTopicInfo {
 
     private String sourceTopic;
     private String brokerName;
     private String queueId;
+    private String targetTopic;
 
-    public TaskTopicInfo(String sourceTopic, String brokerName, String queueId) {
+    public TaskTopicInfo(String sourceTopic, String brokerName, String queueId, String targetTopic) {
         this.sourceTopic = sourceTopic;
         this.brokerName = brokerName;
         this.queueId = queueId;
+        this.targetTopic = targetTopic;
     }
 
     public String getSourceTopic() {
@@ -51,4 +55,17 @@ public class TaskTopicInfo {
     public void setQueueId(String queueId) {
         this.queueId = queueId;
     }
+
+    public String getTargetTopic() {
+        return this.targetTopic;
+    }
+
+    public void setTargetTopic(String targetTopic) {
+        this.targetTopic = targetTopic;
+    }
+
+    public MessageQueue convertMQ() {
+        return new MessageQueue(sourceTopic,
+           brokerName, Integer.parseInt(queueId));
+    }
 }
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index 77ed871..d6a15ad 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -22,14 +22,15 @@ import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public class DivideTaskByQueue extends TaskDivideStrategy {
-    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
+    public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
 
-        for (String t: topicRouteMap.keySet()) {
-            for (MessageQueue mq: topicRouteMap.get(t)) {
+        for (String t : topicRouteMap.keySet()) {
+            for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
             }
         }
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index e667d57..f8dc8cf 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -28,21 +28,21 @@ import java.util.Map;
 
 public class DivideTaskByTopic extends TaskDivideStrategy {
 
-    public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
+    public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
 
         List<KeyValue> config = new ArrayList<KeyValue>();
         int parallelism = tdc.getTaskParallelism();
         int id = -1;
         Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
-        for (String t: topicRouteMap.keySet()) {
-            int ind = ++id%parallelism;
+        for (Map.Entry<String, List<TaskTopicInfo>> entry : topicRouteMap.entrySet()) {
+            int ind = ++id % parallelism;
             if (!taskTopicList.containsKey(ind)) {
                 taskTopicList.put(ind, new ArrayList<TaskTopicInfo>());
             }
-            taskTopicList.get(ind).add(new TaskTopicInfo(t, "", ""));
+            taskTopicList.get(ind).addAll(entry.getValue());
         }
 
-        for (int i=0; i<parallelism; i++) {
+        for (int i = 0; i < parallelism; i++) {
             KeyValue keyValue = new DefaultKeyValue();
             keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
             keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index 0e0ac99..093d92e 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -21,8 +21,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.replicator.config.TaskDivideConfig;
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
 
 public abstract class TaskDivideStrategy {
 
-    public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, TaskDivideConfig tdc);
+    public abstract List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc);
 }
diff --git a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
index 6ac0049..f271f14 100644
--- a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -29,7 +29,6 @@ import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -42,51 +41,21 @@ import org.mockito.Mockito;
 import org.mockito.internal.util.reflection.FieldSetter;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
+
 @RunWith(MockitoJUnitRunner.class)
 public class RmqSourceReplicatorTest {
 
-    @Mock
-    private DefaultMQAdminExt defaultMQAdminExt;
-
-
     @Test
-    public void buildWildcardRoute() throws RemotingException, MQClientException, InterruptedException, NoSuchFieldException {
-
+    public void testGenerateTopic() throws NoSuchFieldException {
         RmqSourceReplicator rmqSourceReplicator = Mockito.spy(RmqSourceReplicator.class);
-
-        TopicList topicList = new TopicList();
-        Set<String> topics = new HashSet<String>();
-        topics.add("topic1");
-        topics.add("topic2");
-        topics.add("sub-topic1-test");
-        topics.add("sub-topic2-test");
-        topics.add("sub-topic2-xxx");
-        topics.add("sub-0");
-        topics.add("test-0");
-        topics.add("0-test");
-        topicList.setTopicList(topics);
-        when(defaultMQAdminExt.fetchAllTopicList()).thenReturn(topicList);
-
-        TopicRouteData topicRouteData = new TopicRouteData();
-        topicRouteData.setQueueDatas(Collections.<QueueData>emptyList());
-        when(defaultMQAdminExt.examineTopicRouteInfo(any(String.class))).thenReturn(topicRouteData);
-
-
-        Field field = RmqSourceReplicator.class.getDeclaredField("defaultMQAdminExt");
-        FieldSetter.setField(rmqSourceReplicator, field, defaultMQAdminExt);
-
-        Set<String> whiteList = new HashSet<String>();
-        whiteList.add("topic1");
-        whiteList.add("\\w+-test");
-        rmqSourceReplicator.setWhiteList(whiteList);
-        rmqSourceReplicator.buildRoute();
-        Map<String, List<MessageQueue>> queues = rmqSourceReplicator.getTopicRouteMap();
-        Set<String> expected = new HashSet<String>();
-        expected.add("topic1");
-        expected.add("0-test");
-        assertThat(queues.size()).isEqualTo(expected.size());
-        for (String topic : expected) {
-            assertThat(queues.containsKey(topic)).isTrue();
-        }
+        KeyValue kv = new DefaultKeyValue();
+        kv.put(ConfigDefine.CONN_TOPIC_RENAME_FMT, "${topic}.replica");
+        Field field = RmqSourceReplicator.class.getDeclaredField("replicatorConfig");
+        FieldSetter.setField(rmqSourceReplicator, field, kv);
+        String dstTopic = rmqSourceReplicator.generateTargetTopic("dest");
+        assertThat(dstTopic).isEqualTo("dest.replica");
     }
 }