You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:21 UTC
[rocketmq-connect] 11/39: Topic rename. resolve #399
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 81376b9a25448472d1afd2b45088882a8851187f
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Mon Sep 9 11:31:23 2019 +0800
Topic rename. resolve #399
---
.../rocketmq/replicator/RmqSourceReplicator.java | 27 ++++++++---
.../apache/rocketmq/replicator/RmqSourceTask.java | 14 +++---
.../apache/rocketmq/replicator/common/Utils.java | 6 +--
.../rocketmq/replicator/config/ConfigDefine.java | 4 +-
.../rocketmq/replicator/config/TaskTopicInfo.java | 19 +++++++-
.../replicator/strategy/DivideTaskByQueue.java | 7 +--
.../replicator/strategy/DivideTaskByTopic.java | 10 ++--
.../replicator/strategy/TaskDivideStrategy.java | 3 +-
.../replicator/RmqSourceReplicatorTest.java | 53 +++++-----------------
9 files changed, 74 insertions(+), 69 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 6ea3ae8..f24cf63 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.text.StrSubstitutor;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
@@ -42,6 +44,7 @@ import org.apache.rocketmq.replicator.common.Utils;
import org.apache.rocketmq.replicator.config.ConfigDefine;
import org.apache.rocketmq.replicator.config.DataType;
import org.apache.rocketmq.replicator.config.TaskDivideConfig;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
import org.apache.rocketmq.replicator.strategy.DivideStrategyEnum;
import org.apache.rocketmq.replicator.strategy.DivideTaskByQueue;
import org.apache.rocketmq.replicator.strategy.DivideTaskByTopic;
@@ -60,7 +63,7 @@ public class RmqSourceReplicator extends SourceConnector {
private KeyValue replicatorConfig;
- private Map<String, List<MessageQueue>> topicRouteMap;
+ private Map<String, List<TaskTopicInfo>> topicRouteMap;
private TaskDivideStrategy taskDivideStrategy;
@@ -76,7 +79,7 @@ public class RmqSourceReplicator extends SourceConnector {
private volatile boolean adminStarted;
public RmqSourceReplicator() {
- topicRouteMap = new HashMap<String, List<MessageQueue>>();
+ topicRouteMap = new HashMap<String, List<TaskTopicInfo>>();
whiteList = new HashSet<String>();
}
@@ -204,8 +207,9 @@ public class RmqSourceReplicator extends SourceConnector {
for (Pattern pattern : patterns) {
Matcher matcher = pattern.matcher(topic);
if (matcher.matches()) {
+ String targetTopic = generateTargetTopic(topic);
if (!targetTopicSet.contains(topic)) {
- ensureTargetTopic(topic, topic);
+ ensureTargetTopic(topic, targetTopic);
}
// different from BrokerData with cluster field, which can ensure the brokerData is from expected cluster.
@@ -219,13 +223,13 @@ public class RmqSourceReplicator extends SourceConnector {
TopicRouteData topicRouteData = srcMQAdminExt.examineTopicRouteInfo(topic);
if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+ topicRouteMap.put(topic, new ArrayList<TaskTopicInfo>());
}
for (QueueData qd : topicRouteData.getQueueDatas()) {
if (brokerNameSet.contains(qd.getBrokerName())) {
for (int i = 0; i < qd.getReadQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- topicRouteMap.get(topic).add(mq);
+ TaskTopicInfo taskTopicInfo = new TaskTopicInfo(topic, qd.getBrokerName(), String.valueOf(i), targetTopic);
+ topicRouteMap.get(topic).add(taskTopicInfo);
}
}
}
@@ -244,7 +248,7 @@ public class RmqSourceReplicator extends SourceConnector {
this.whiteList = whiteList;
}
- public Map<String, List<MessageQueue>> getTopicRouteMap() {
+ public Map<String, List<TaskTopicInfo>> getTopicRouteMap() {
return this.topicRouteMap;
}
@@ -283,5 +287,14 @@ public class RmqSourceReplicator extends SourceConnector {
throw new IllegalStateException("");
}
+ public String generateTargetTopic(String topic) {
+ String fmt = this.replicatorConfig.getString(ConfigDefine.CONN_TOPIC_RENAME_FMT);
+ if (StringUtils.isNotEmpty(fmt)) {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put("topic", topic);
+ return StrSubstitutor.replace(fmt, params);
+ }
+ return topic;
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 9909d61..c4d53fa 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -46,12 +46,12 @@ public class RmqSourceTask extends SourceTask {
private final DefaultMQPullConsumer consumer;
private volatile boolean started = false;
- private Map<MessageQueue, Long> mqOffsetMap;
+ private Map<TaskTopicInfo, Long> mqOffsetMap;
public RmqSourceTask() {
this.config = new TaskConfig();
this.consumer = new DefaultMQPullConsumer();
this.taskId = Utils.createTaskId(Thread.currentThread().getName());
- mqOffsetMap = new HashMap<MessageQueue, Long>();
+ mqOffsetMap = new HashMap<TaskTopicInfo, Long>();
}
public Collection<SourceDataEntry> poll() {
@@ -95,7 +95,7 @@ public class RmqSourceTask extends SourceTask {
} else {
this.config.setNextPosition(0L);
}
- mqOffsetMap.put(mq, this.config.getNextPosition());
+ mqOffsetMap.put(tti, this.config.getNextPosition());
}
}
} else {
@@ -114,7 +114,7 @@ public class RmqSourceTask extends SourceTask {
} else {
this.config.setNextPosition(0L);
}
- mqOffsetMap.put(mq, this.config.getNextPosition());
+ mqOffsetMap.put(tti, this.config.getNextPosition());
}
}
}
@@ -148,12 +148,13 @@ public class RmqSourceTask extends SourceTask {
List<SourceDataEntry> res = new ArrayList<SourceDataEntry>();
if (started) {
try {
- for (MessageQueue mq : this.mqOffsetMap.keySet()) {
+ for (TaskTopicInfo taskTopicConfig : this.mqOffsetMap.keySet()) {
+ MessageQueue mq = taskTopicConfig.convertMQ();
PullResult pullResult = consumer.pull(mq, "*",
this.mqOffsetMap.get(mq), 32);
switch (pullResult.getPullStatus()) {
case FOUND: {
- this.mqOffsetMap.put(mq, pullResult.getNextBeginOffset());
+ this.mqOffsetMap.put(taskTopicConfig, pullResult.getNextBeginOffset());
JSONObject jsonObject = new JSONObject();
jsonObject.put(RmqConstants.NEXT_POSITION, pullResult.getNextBeginOffset());
List<MessageExt> msgs = pullResult.getMsgFoundList();
@@ -175,6 +176,7 @@ public class RmqSourceTask extends SourceTask {
String.valueOf(mq.getQueueId())).getBytes("UTF-8")),
ByteBuffer.wrap(jsonObject.toJSONString().getBytes("UTF-8"))
);
+ sourceDataEntry.setQueueName(taskTopicConfig.getTargetTopic());
res.add(sourceDataEntry);
break;
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index e5c0866..f3e6be6 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -58,12 +58,12 @@ public class Utils {
return String.valueOf(namesrvList.toString().hashCode());
}
-
- public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic, String cluster) throws RemotingException, MQClientException, InterruptedException {
+ public static List<BrokerData> examineBrokerData(DefaultMQAdminExt defaultMQAdminExt, String topic,
+ String cluster) throws RemotingException, MQClientException, InterruptedException {
List<BrokerData> brokerList = new ArrayList<BrokerData>();
TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- if (topicRouteData.getBrokerDatas() != null) { // check下
+ if (topicRouteData.getBrokerDatas() != null) {
for (BrokerData broker : topicRouteData.getBrokerDatas()) {
if (StringUtils.equals(broker.getCluster(), cluster)) {
brokerList.add(broker);
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
index ddf4972..5b15821 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/ConfigDefine.java
@@ -41,10 +41,12 @@ public class ConfigDefine {
public static final String CONN_TASK_PARALLELISM = "task-parallelism";
+ public static final String CONN_TOPIC_RENAME_FMT = "topic.rename.format";
+
/**
* The required key for all configurations.
*/
- public static final Set<String> REQUEST_CONFIG = new HashSet<String>(){
+ public static final Set<String> REQUEST_CONFIG = new HashSet<String>() {
{
add(CONN_SOURCE_RMQ);
add(CONN_TARGET_RMQ);
diff --git a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
index f078028..c5a39e4 100644
--- a/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
+++ b/src/main/java/org/apache/rocketmq/replicator/config/TaskTopicInfo.java
@@ -16,16 +16,20 @@
*/
package org.apache.rocketmq.replicator.config;
+import org.apache.rocketmq.common.message.MessageQueue;
+
public class TaskTopicInfo {
private String sourceTopic;
private String brokerName;
private String queueId;
+ private String targetTopic;
- public TaskTopicInfo(String sourceTopic, String brokerName, String queueId) {
+ public TaskTopicInfo(String sourceTopic, String brokerName, String queueId, String targetTopic) {
this.sourceTopic = sourceTopic;
this.brokerName = brokerName;
this.queueId = queueId;
+ this.targetTopic = targetTopic;
}
public String getSourceTopic() {
@@ -51,4 +55,17 @@ public class TaskTopicInfo {
public void setQueueId(String queueId) {
this.queueId = queueId;
}
+
+ public String getTargetTopic() {
+ return this.targetTopic;
+ }
+
+ public void setTargetTopic(String targetTopic) {
+ this.targetTopic = targetTopic;
+ }
+
+ public MessageQueue convertMQ() {
+ return new MessageQueue(sourceTopic,
+ brokerName, Integer.parseInt(queueId));
+ }
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
index 77ed871..d6a15ad 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByQueue.java
@@ -22,14 +22,15 @@ import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public class DivideTaskByQueue extends TaskDivideStrategy {
- public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
+ public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
- for (String t: topicRouteMap.keySet()) {
- for (MessageQueue mq: topicRouteMap.get(t)) {
+ for (String t : topicRouteMap.keySet()) {
+ for (TaskTopicInfo taskTopicInfo : topicRouteMap.get(t)) {
}
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
index e667d57..f8dc8cf 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/DivideTaskByTopic.java
@@ -28,21 +28,21 @@ import java.util.Map;
public class DivideTaskByTopic extends TaskDivideStrategy {
- public List<KeyValue> divide(Map<String, List<MessageQueue>> topicRouteMap, TaskDivideConfig tdc) {
+ public List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicRouteMap, TaskDivideConfig tdc) {
List<KeyValue> config = new ArrayList<KeyValue>();
int parallelism = tdc.getTaskParallelism();
int id = -1;
Map<Integer, List<TaskTopicInfo>> taskTopicList = new HashMap<Integer, List<TaskTopicInfo>>();
- for (String t: topicRouteMap.keySet()) {
- int ind = ++id%parallelism;
+ for (Map.Entry<String, List<TaskTopicInfo>> entry : topicRouteMap.entrySet()) {
+ int ind = ++id % parallelism;
if (!taskTopicList.containsKey(ind)) {
taskTopicList.put(ind, new ArrayList<TaskTopicInfo>());
}
- taskTopicList.get(ind).add(new TaskTopicInfo(t, "", ""));
+ taskTopicList.get(ind).addAll(entry.getValue());
}
- for (int i=0; i<parallelism; i++) {
+ for (int i = 0; i < parallelism; i++) {
KeyValue keyValue = new DefaultKeyValue();
keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSourceNamesrvAddr());
diff --git a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
index 0e0ac99..093d92e 100644
--- a/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
+++ b/src/main/java/org/apache/rocketmq/replicator/strategy/TaskDivideStrategy.java
@@ -21,8 +21,9 @@ import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.replicator.config.TaskDivideConfig;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.replicator.config.TaskTopicInfo;
public abstract class TaskDivideStrategy {
- public abstract List<KeyValue> divide(Map<String, List<MessageQueue>> topicMap, TaskDivideConfig tdc);
+ public abstract List<KeyValue> divide(Map<String, List<TaskTopicInfo>> topicMap, TaskDivideConfig tdc);
}
diff --git a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
index 6ac0049..f271f14 100644
--- a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
+++ b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -29,7 +29,6 @@ import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
-import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -42,51 +41,21 @@ import org.mockito.Mockito;
import org.mockito.internal.util.reflection.FieldSetter;
import org.mockito.junit.MockitoJUnitRunner;
+import io.openmessaging.KeyValue;
+import io.openmessaging.internal.DefaultKeyValue;
+import org.apache.rocketmq.replicator.config.ConfigDefine;
+
@RunWith(MockitoJUnitRunner.class)
public class RmqSourceReplicatorTest {
- @Mock
- private DefaultMQAdminExt defaultMQAdminExt;
-
-
@Test
- public void buildWildcardRoute() throws RemotingException, MQClientException, InterruptedException, NoSuchFieldException {
-
+ public void testGenerateTopic() throws NoSuchFieldException {
RmqSourceReplicator rmqSourceReplicator = Mockito.spy(RmqSourceReplicator.class);
-
- TopicList topicList = new TopicList();
- Set<String> topics = new HashSet<String>();
- topics.add("topic1");
- topics.add("topic2");
- topics.add("sub-topic1-test");
- topics.add("sub-topic2-test");
- topics.add("sub-topic2-xxx");
- topics.add("sub-0");
- topics.add("test-0");
- topics.add("0-test");
- topicList.setTopicList(topics);
- when(defaultMQAdminExt.fetchAllTopicList()).thenReturn(topicList);
-
- TopicRouteData topicRouteData = new TopicRouteData();
- topicRouteData.setQueueDatas(Collections.<QueueData>emptyList());
- when(defaultMQAdminExt.examineTopicRouteInfo(any(String.class))).thenReturn(topicRouteData);
-
-
- Field field = RmqSourceReplicator.class.getDeclaredField("defaultMQAdminExt");
- FieldSetter.setField(rmqSourceReplicator, field, defaultMQAdminExt);
-
- Set<String> whiteList = new HashSet<String>();
- whiteList.add("topic1");
- whiteList.add("\\w+-test");
- rmqSourceReplicator.setWhiteList(whiteList);
- rmqSourceReplicator.buildRoute();
- Map<String, List<MessageQueue>> queues = rmqSourceReplicator.getTopicRouteMap();
- Set<String> expected = new HashSet<String>();
- expected.add("topic1");
- expected.add("0-test");
- assertThat(queues.size()).isEqualTo(expected.size());
- for (String topic : expected) {
- assertThat(queues.containsKey(topic)).isTrue();
- }
+ KeyValue kv = new DefaultKeyValue();
+ kv.put(ConfigDefine.CONN_TOPIC_RENAME_FMT, "${topic}.replica");
+ Field field = RmqSourceReplicator.class.getDeclaredField("replicatorConfig");
+ FieldSetter.setField(rmqSourceReplicator, field, kv);
+ String dstTopic = rmqSourceReplicator.generateTargetTopic("dest");
+ assertThat(dstTopic).isEqualTo("dest.replica");
}
}