You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:38 UTC

[rocketmq-connect] 28/39: [ISSUE #492] Optimize metadata synchronization and fix RocketMQConverter bug (#493)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit 070dd98cf2492fd760dc2d34b1f18057e1b2a56f
Author: zhoubo <87...@qq.com>
AuthorDate: Thu Dec 26 11:07:45 2019 +0800

    [ISSUE #492] Optimize metadata synchronization and fix RocketMQConverter bug (#493)
    
    * TopicList is null exception and frequent requestTaskReconfiguration
    
    * https://github.com/apache/rocketmq-externals/issues/478
    
    * * Runtime add some log
    * Fix replicator add new topic frequent requestTaskReconfiguration bug
    
    * Optimize metadata synchronization and fix RocketMQConverter bug
    https://github.com/apache/rocketmq-externals/issues/492
    
    * Fix replicator bug
    
    * Exclude groups starting with RebalanceService
---
 .../org/apache/rocketmq/replicator/MetaSourceTask.java   |  1 +
 .../apache/rocketmq/replicator/RmqMetaReplicator.java    |  4 +++-
 .../apache/rocketmq/replicator/RmqSourceReplicator.java  | 16 +++++++++-------
 3 files changed, 13 insertions(+), 8 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 16bf464..f28078f 100644
--- a/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -77,6 +77,7 @@ public class MetaSourceTask extends SourceTask {
         if (started) {
             started = false;
         }
+        srcMQAdminExt.shutdown();
     }
 
     @Override public void pause() {
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 3b00de5..75bc919 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -106,6 +106,8 @@ public class RmqMetaReplicator extends SourceConnector {
     @Override public void stop() {
         log.info("stopping...");
         this.executor.shutdown();
+        this.srcMQAdminExt.shutdown();
+        this.targetMQAdminExt.shutdown();
     }
 
     @Override public void pause() {
@@ -217,7 +219,7 @@ public class RmqMetaReplicator extends SourceConnector {
 
     private boolean skipInnerGroup(String group) {
         if (INNER_CONSUMER_GROUPS.contains(group) || group.startsWith("CID_RMQ_SYS_") || group.startsWith("PositionManage") ||
-            group.startsWith("ConfigManage") || group.startsWith("OffsetManage")) {
+            group.startsWith("ConfigManage") || group.startsWith("OffsetManage") || group.startsWith("DefaultConnectCluster") || group.startsWith("RebalanceService")) {
             return false;
         }
         return true;
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 82744ed..10efae6 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -242,7 +242,7 @@ public class RmqSourceReplicator extends SourceConnector {
                         Matcher matcher = pattern.matcher(topic);
                         if (matcher.matches()) {
                             String targetTopic = generateTargetTopic(topic);
-                            if (!targetTopicSet.contains(topic)) {
+                            if (!targetTopicSet.contains(targetTopic)) {
                                 ensureTargetTopic(topic, targetTopic);
                             }
 
@@ -273,8 +273,6 @@ public class RmqSourceReplicator extends SourceConnector {
             }
         } catch (Exception e) {
             log.error("Fetch topic list error.", e);
-        } finally {
-            srcMQAdminExt.shutdown();
         }
     }
 
@@ -309,12 +307,16 @@ public class RmqSourceReplicator extends SourceConnector {
             throw new IllegalStateException(String.format("no broker found for srcTopic: %s srcCluster: %s", srcTopic, srcCluster));
         }
 
-        String brokerAddr = brokerList.get(0).selectBrokerAddr();
-        TopicConfig topicConfig = this.srcMQAdminExt.examineTopicConfig(brokerAddr, srcTopic);
+        final TopicRouteData topicRouteData = this.srcMQAdminExt.examineTopicRouteInfo(srcTopic);
+        final TopicConfig topicConfig = new TopicConfig();
+        final List<QueueData> queueDatas = topicRouteData.getQueueDatas();
+        QueueData queueData = queueDatas.get(0);
+        topicConfig.setPerm(queueData.getPerm());
+        topicConfig.setReadQueueNums(queueData.getReadQueueNums());
+        topicConfig.setWriteQueueNums(queueData.getWriteQueueNums());
+        topicConfig.setTopicSysFlag(queueData.getTopicSynFlag());
         topicConfig.setTopicName(targetTopic);
         Utils.createTopic(this.targetMQAdminExt, topicConfig, targetCluster);
-
-        throw new IllegalStateException("");
     }
 
     public String generateTargetTopic(String topic) {