You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:19 UTC

[rocketmq-connect] 09/39: Support wildcard subscription topic. resolve #395

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d80142a96c9b1f43f9bbbdbef85df6e7ad3501e2
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Sun Sep 8 01:13:11 2019 +0800

    Support wildcard subscription topic. resolve #395
---
 pom.xml                                            | 14 +++-
 .../rocketmq/replicator/RmqSourceReplicator.java   | 59 ++++++++++----
 .../apache/rocketmq/replicator/RmqSourceTask.java  |  2 +-
 .../replicator/RmqSourceReplicatorTest.java        | 92 ++++++++++++++++++++++
 4 files changed, 149 insertions(+), 18 deletions(-)

diff --git a/pom.xml b/pom.xml
index 963222e..b528c80 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,6 +42,18 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>2.6.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <version>2.6.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
             <version>1.7.5</version>
@@ -106,4 +118,4 @@
         </profile>
     </profiles>
 
-</project>
\ No newline at end of file
+</project>
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 12473ab..b49e06d 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -25,9 +25,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -156,20 +159,45 @@ public class RmqSourceReplicator extends SourceConnector {
 
         startMQAdminTools();
 
+        buildRoute();
+
+        TaskDivideConfig tdc = new TaskDivideConfig(
+            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
+            this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
+            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
+            DataType.COMMON_MESSAGE.ordinal(),
+            this.taskParallelism
+        );
+        return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+    }
+
+    public void buildRoute() {
         try {
+            List<Pattern> patterns = new ArrayList<Pattern>();
             for (String topic : this.whiteList) {
+                Pattern pattern = Pattern.compile(topic);
+                patterns.add(pattern);
+            }
+
+            TopicList topics = defaultMQAdminExt.fetchAllTopicList();
+            for (String topic : topics.getTopicList()) {
                 if ((syncRETRY && topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) ||
                     (syncDLQ && topic.startsWith(MixAll.DLQ_GROUP_TOPIC_PREFIX)) ||
                     !topic.equals(ConfigDefine.CONN_STORE_TOPIC)) {
 
-                    TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
-                    if (!topicRouteMap.containsKey(topic)) {
-                        topicRouteMap.put(topic, new ArrayList<MessageQueue>());
-                    }
-                    for (QueueData qd : topicRouteData.getQueueDatas()) {
-                        for (int i = 0; i < qd.getReadQueueNums(); i++) {
-                            MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
-                            topicRouteMap.get(topic).add(mq);
+                    for (Pattern pattern : patterns) {
+                        Matcher matcher = pattern.matcher(topic);
+                        if (matcher.matches()) {
+                            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+                            if (!topicRouteMap.containsKey(topic)) {
+                                topicRouteMap.put(topic, new ArrayList<MessageQueue>());
+                            }
+                            for (QueueData qd : topicRouteData.getQueueDatas()) {
+                                for (int i = 0; i < qd.getReadQueueNums(); i++) {
+                                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);
+                                    topicRouteMap.get(topic).add(mq);
+                                }
+                            }
                         }
                     }
                 }
@@ -179,15 +207,14 @@ public class RmqSourceReplicator extends SourceConnector {
         } finally {
             defaultMQAdminExt.shutdown();
         }
+    }
 
-        TaskDivideConfig tdc = new TaskDivideConfig(
-            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RMQ),
-            this.replicatorConfig.getString(ConfigDefine.CONN_STORE_TOPIC),
-            this.replicatorConfig.getString(ConfigDefine.CONN_SOURCE_RECORD_CONVERTER),
-            DataType.COMMON_MESSAGE.ordinal(),
-            this.taskParallelism
-        );
-        return this.taskDivideStrategy.divide(this.topicRouteMap, tdc);
+    public void setWhiteList(Set<String> whiteList) {
+        this.whiteList = whiteList;
+    }
+
+    public Map<String, List<MessageQueue>> getTopicRouteMap() {
+        return this.topicRouteMap;
     }
 }
 
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
index 8c8e434..9909d61 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceTask.java
@@ -126,7 +126,7 @@ public class RmqSourceTask extends SourceTask {
     }
 
     public void stop() {
-      
+
         if (started) {
             if (this.consumer != null) {
                 this.consumer.shutdown();
diff --git a/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
new file mode 100644
index 0000000..6ac0049
--- /dev/null
+++ b/src/test/java/org/apache/rocketmq/replicator/RmqSourceReplicatorTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.replicator;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.TopicList;
+import org.apache.rocketmq.common.protocol.route.QueueData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.FieldSetter;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class RmqSourceReplicatorTest {
+
+    @Mock
+    private DefaultMQAdminExt defaultMQAdminExt;
+
+
+    @Test
+    public void buildWildcardRoute() throws RemotingException, MQClientException, InterruptedException, NoSuchFieldException {
+
+        RmqSourceReplicator rmqSourceReplicator = Mockito.spy(RmqSourceReplicator.class);
+
+        TopicList topicList = new TopicList();
+        Set<String> topics = new HashSet<String>();
+        topics.add("topic1");
+        topics.add("topic2");
+        topics.add("sub-topic1-test");
+        topics.add("sub-topic2-test");
+        topics.add("sub-topic2-xxx");
+        topics.add("sub-0");
+        topics.add("test-0");
+        topics.add("0-test");
+        topicList.setTopicList(topics);
+        when(defaultMQAdminExt.fetchAllTopicList()).thenReturn(topicList);
+
+        TopicRouteData topicRouteData = new TopicRouteData();
+        topicRouteData.setQueueDatas(Collections.<QueueData>emptyList());
+        when(defaultMQAdminExt.examineTopicRouteInfo(any(String.class))).thenReturn(topicRouteData);
+
+
+        Field field = RmqSourceReplicator.class.getDeclaredField("defaultMQAdminExt");
+        FieldSetter.setField(rmqSourceReplicator, field, defaultMQAdminExt);
+
+        Set<String> whiteList = new HashSet<String>();
+        whiteList.add("topic1");
+        whiteList.add("\\w+-test");
+        rmqSourceReplicator.setWhiteList(whiteList);
+        rmqSourceReplicator.buildRoute();
+        Map<String, List<MessageQueue>> queues = rmqSourceReplicator.getTopicRouteMap();
+        Set<String> expected = new HashSet<String>();
+        expected.add("topic1");
+        expected.add("0-test");
+        assertThat(queues.size()).isEqualTo(expected.size());
+        for (String topic : expected) {
+            assertThat(queues.containsKey(topic)).isTrue();
+        }
+    }
+}