You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:19 UTC
[rocketmq-connect] 09/39: Support wildcard subscription topic. resolve #395
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit d80142a96c9b1f43f9bbbdbef85df6e7ad3501e2
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Sun Sep 8 01:13:11 2019 +0800
Support wildcard subscription topic. resolve #395
---
pom.xml | 14 +++-
.../rocketmq/replicator/RmqSourceReplicator.java | 59 ++++++++++----
.../apache/rocketmq/replicator/RmqSourceTask.java | 2 +-
.../replicator/RmqSourceReplicatorTest.java | 92 ++++++++++++++++++++++
4 files changed, 149 insertions(+), 18 deletions(-)
diff --git a/pom.xml b/pom.xml
index 963222e..b528c80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,18 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>2.6.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <version>2.6.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
@@ -106,4 +118,4 @@
</profile>
</profiles>
-</project>
\ No newline at end of file
+</project>
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 12473ab..b49e06d 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -25,9 +25,12 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.RPCHook;
@@ -156,20 +159,45 @@ public class RmqSourceReplicator extends SourceConnector {
startMQAdminTools();
+ buildRoute();
+
+ TaskDivideConfig tdc = new TaskDivideConfig(
+ this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
+ this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
+ this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+ DataType.COMMON_MESSAGE.ordinal(),
+ this.taskParallelism
+ );
+ return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+ }
+
+ public void buildRoute() {
try {
+ List<Pattern> patterns = new ArrayList<Pattern>();
for (String topic : this.whiteList) {
+ Pattern pattern = Pattern.compile(topic);
+ patterns.add(pattern);
+ }
+
+ TopicList topics = defaultMQAdminExt.fetchAllTopicList();
+ for (String topic : topics.getTopicList()) {
if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
(syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
!topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
- TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
- if (!topicRouteMap.containsKey(topic)) {
- topicRouteMap.put(topic, new ArrayList<MessageQueue>());
- }
- for (QueueData qd : topicRouteData.getQueueDatas()) {
- for (int i = 0; i < qd.getReadQueueNums(); i++) {
- MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
- topicRouteMap.get(topic).add(mq);
+ for (Pattern pattern : patterns) {
+ Matcher matcher = pattern.matcher(topic);
+ if (matcher.matches()) {
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (!topicRouteMap.containsKey(topic)) {
+ topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+ }
+ for (QueueData qd : topicRouteData.getQueueDatas()) {
+ for (int i = 0; i < qd.getReadQueueNums(); i++) {
+ MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+ topicRouteMap.get(topic).add(mq);
+ }
+ }
}
}
}
@@ -179,15 +207,14 @@ public class RmqSourceReplicator extends SourceConnector {
} finally {
defaultMQAdminExt.shutdown();
}
+ }
- TaskDivideConfig tdc = new TaskDivideConfig(
- this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
- this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
- this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
- DataType.COMMON_MESSAGE.ordinal(),
- this.taskParallelism
- );
- return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+ public void setWhiteList(Set<String> whiteList) {
+ this.whiteList = whiteList;
+ }
+
+ public Map<String, List<MessageQueue>> getTopicRouteMap() {
+ return this.topicRouteMap;
}
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 8c8e434..9909d61 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -126,7 +126,7 @@ public class RmqSourceTask extends SourceTask {
}
public void stop() {
-
+
if (started) {
if (this.consumer != null) {
this.consumer.shutdown();
diff --git a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
new file mode 100644
index 0000000..6ac0049
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.FieldSetter;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RmqSourceReplicatorTest {
+
+ @Mock
+ private DefaultMQAdminExt defaultMQAdminExt;
+
+
+ @Test
+ public void buildWildcardRoute() throws RemotingException, MQClientException, InterruptedException, NoSuchFieldException {
+
+ RmqSourceReplicator rmqSourceReplicator = Mockito.spy(RmqSourceReplicator.class);
+
+ TopicList topicList = new TopicList();
+ Set<String> topics = new HashSet<String>();
+ topics.add("topic1");
+ topics.add("topic2");
+ topics.add("sub-topic1-test");
+ topics.add("sub-topic2-test");
+ topics.add("sub-topic2-xxx");
+ topics.add("sub-0");
+ topics.add("test-0");
+ topics.add("0-test");
+ topicList.setTopicList(topics);
+ when(defaultMQAdminExt.fetchAllTopicList()).thenReturn(topicList);
+
+ TopicRouteData topicRouteData = new TopicRouteData();
+ topicRouteData.setQueueDatas(Collections.<QueueData>emptyList());
+ when(defaultMQAdminExt.examineTopicRouteInfo(any(String.class))).thenReturn(topicRouteData);
+
+
+ Field field = RmqSourceReplicator.class.getDeclaredField("defaultMQAdminExt");
+ FieldSetter.setField(rmqSourceReplicator, field, defaultMQAdminExt);
+
+ Set<String> whiteList = new HashSet<String>();
+ whiteList.add("topic1");
+ whiteList.add("\\w+-test");
+ rmqSourceReplicator.setWhiteList(whiteList);
+ rmqSourceReplicator.buildRoute();
+ Map<String, List<MessageQueue>> queues = rmqSourceReplicator.getTopicRouteMap();
+ Set<String> expected = new HashSet<String>();
+ expected.add("topic1");
+ expected.add("0-test");
+ assertThat(queues.size()).isEqualTo(expected.size());
+ for (String topic : expected) {
+ assertThat(queues.containsKey(topic)).isTrue();
+ }
+ }
+}