You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:39 UTC

[rocketmq-connect] 29/39: [ISSUE 503] Metadata synchronization optimization (#504)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit ae2751eb5d72a643846414f65b7512186f4077ce
Author: zhoubo <87...@qq.com>
AuthorDate: Thu Dec 26 20:54:00 2019 +0800

    [ISSUE 503] Metadata synchronization optimization (#504)
    
    * TopicList is null exception and frequent requestTaskReconfiguration
    
    * https://github.com/apache/rocketmq-externals/issues/478
    
    * * Runtime add some log
    * Fix replicator add new topic frequent requestTaskReconfiguration bug
    
    * Optimize metadata synchronization and fix RocketMQConverter bug
    https://github.com/apache/rocketmq-externals/issues/492
    
    * Fix replicator bug
    
    * Exclude groups starting with RebalanceService
    
    * 1.Metadata synchronization bug fix
    2.consumerGroup filters out the ones used in runtime
    3.Multiple task consumerGroup name duplication issues
    4.Incorrect offsetSyncTopic assignment in task configuration
---
 .../java/org/apache/rocketmq/replicator/RmqMetaReplicator.java    | 8 +++++---
 src/main/java/org/apache/rocketmq/replicator/common/Utils.java    | 6 +++---
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 75bc919..856fce8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -183,12 +183,14 @@ public class RmqMetaReplicator extends SourceConnector {
             Collections.shuffle(masters);
 
             Set<String> targetBrokers =
-                CommandUtil.fetchMasterAddrByClusterName(this.targetMQAdminExt, replicatorConfig.getSrcCluster());
+                CommandUtil.fetchMasterAddrByClusterName(this.targetMQAdminExt, replicatorConfig.getTargetCluster());
 
             String addr = masters.get(0);
             SubscriptionGroupWrapper sub = this.srcMQAdminExt.getAllSubscriptionGroup(addr, TimeUnit.SECONDS.toMillis(10));
             for (Map.Entry<String, SubscriptionGroupConfig> entry : sub.getSubscriptionGroupTable().entrySet()) {
-                ensureSubConfig(targetBrokers, entry.getValue());
+                if (skipInnerGroup(entry.getKey())) {
+                    ensureSubConfig(targetBrokers, entry.getValue());
+                }
             }
         } catch (Exception e) {
             log.error("syncSubConfig failed", e);
@@ -196,7 +198,7 @@ public class RmqMetaReplicator extends SourceConnector {
     }
 
     private void ensureSubConfig(Collection<String> targetBrokers,
-        SubscriptionGroupConfig subConfig) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+            SubscriptionGroupConfig subConfig) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
         for (String addr : targetBrokers) {
             this.targetMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subConfig);
         }
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 60687d7..9ccee41 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -22,8 +22,8 @@ import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -44,7 +44,7 @@ public class Utils {
     private static final Logger log = LoggerFactory.getLogger(Utils.class);
 
     public static String createGroupName(String prefix) {
-        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+        return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).append("-").append(ThreadLocalRandom.current().nextInt()).toString();
     }
 
     public static String createGroupName(String prefix, String postfix) {
@@ -132,7 +132,7 @@ public class Utils {
             keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
             keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSrcNamesrvs());
             keyValue.put(TaskConfigEnum.TASK_SOURCE_CLUSTER.getKey(), tdc.getSrcCluster());
-            keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getSrcCluster());
+            keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getOffsetSyncTopic());
             keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.OFFSET.ordinal());
             keyValue.put(TaskConfigEnum.TASK_GROUP_INFO.getKey(), JSONObject.toJSONString(groupList));
             keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getConverter());