You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:30 UTC

[rocketmq-connect] 20/39: feat(replicator): Support subcriptionConfig sync

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git

commit d7b0ce8a8739d5c60390e82cb06375c94e54e565
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Wed Oct 23 16:09:56 2019 +0800

    feat(replicator): Support subcriptionConfig sync
    
    - Add syncSubConfig method on RmqMetaReplicator
    
    Closes #438
---
 .../rocketmq/replicator/RmqMetaReplicator.java     | 91 ++++++++++++++--------
 .../apache/rocketmq/replicator/common/Utils.java   | 25 ++++++
 2 files changed, 85 insertions(+), 31 deletions(-)

diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 38e5af2..3b00de5 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -20,26 +20,32 @@ import io.openmessaging.KeyValue;
 import io.openmessaging.connector.api.Task;
 import io.openmessaging.connector.api.source.SourceConnector;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.protocol.body.ClusterInfo;
 import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
-import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.replicator.common.ConstDefine;
 import org.apache.rocketmq.replicator.common.Utils;
 import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,6 +59,7 @@ public class RmqMetaReplicator extends SourceConnector {
     private volatile boolean configValid = false;
     private Set<String> knownGroups;
     private DefaultMQAdminExt srcMQAdminExt;
+    private DefaultMQAdminExt targetMQAdminExt;
     private volatile boolean adminStarted;
     private ScheduledExecutorService executor;
 
@@ -92,14 +99,8 @@ public class RmqMetaReplicator extends SourceConnector {
     @Override public void start() {
         log.info("starting...");
         startMQAdminTools();
-        executor.scheduleAtFixedRate(() ->
-        {
-            try {
-                refreshConsuemrGroups();
-            } catch (Exception e) {
-                log.error("refresh consumer groups failed.", e);
-            }
-        }, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
+        executor.scheduleAtFixedRate(this::refreshConsumerGroups, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
+        executor.scheduleAtFixedRate(this::syncSubConfig, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
     }
 
     @Override public void stop() {
@@ -128,6 +129,7 @@ public class RmqMetaReplicator extends SourceConnector {
         startMQAdminTools();
 
         try {
+            this.syncSubConfig();
             this.knownGroups = this.fetchConsumerGroups();
         } catch (Exception e) {
             e.printStackTrace();
@@ -140,34 +142,61 @@ public class RmqMetaReplicator extends SourceConnector {
         if (!configValid || adminStarted) {
             return;
         }
-        RPCHook rpcHook = null;
-        this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
-        this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getSrcNamesrvs());
-        this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
-        this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getSrcNamesrvs()));
 
         try {
-            this.srcMQAdminExt.start();
-            log.info("RocketMQ srcMQAdminExt started");
+            ImmutablePair<DefaultMQAdminExt, DefaultMQAdminExt> pair = Utils.startMQAdminTools(this.replicatorConfig);
+            this.srcMQAdminExt = pair.getLeft();
+            this.targetMQAdminExt = pair.getRight();
+            log.info("RocketMQ targetMQAdminExt started");
         } catch (MQClientException e) {
             log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
         }
+
         adminStarted = true;
     }
 
-    private void refreshConsuemrGroups() throws InterruptedException, RemotingConnectException, MQBrokerException, RemotingTimeoutException, MQClientException, RemotingSendRequestException {
-        log.debug("refreshConsuemrGroups...");
-        Set<String> groups = fetchConsumerGroups();
-        Set<String> newGroups = new HashSet<>();
-        Set<String> deadGroups = new HashSet<>();
-        newGroups.addAll(groups);
-        newGroups.removeAll(knownGroups);
-        deadGroups.addAll(knownGroups);
-        deadGroups.removeAll(groups);
-        if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
-            log.info("reconfig consumer groups, new Groups: {} , dead groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
-            knownGroups = groups;
-            context.requestTaskReconfiguration();
+    private void refreshConsumerGroups() {
+        try {
+            log.debug("refreshConsuemrGroups...");
+            Set<String> groups = fetchConsumerGroups();
+            Set<String> newGroups = new HashSet<>(groups);
+            Set<String> deadGroups = new HashSet<>(knownGroups);
+            newGroups.removeAll(knownGroups);
+            deadGroups.removeAll(groups);
+            if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
+                log.info("reconfig consumer groups, new Groups: {} , dead groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
+                knownGroups = groups;
+                context.requestTaskReconfiguration();
+            }
+        } catch (Exception e) {
+            log.error("refresh consumer groups failed.", e);
+        }
+    }
+
+    private void syncSubConfig() {
+        try {
+            Set<String> masterSet =
+                CommandUtil.fetchMasterAddrByClusterName(this.srcMQAdminExt, replicatorConfig.getSrcCluster());
+            List<String> masters = new ArrayList<>(masterSet);
+            Collections.shuffle(masters);
+
+            Set<String> targetBrokers =
+                CommandUtil.fetchMasterAddrByClusterName(this.targetMQAdminExt, replicatorConfig.getSrcCluster());
+
+            String addr = masters.get(0);
+            SubscriptionGroupWrapper sub = this.srcMQAdminExt.getAllSubscriptionGroup(addr, TimeUnit.SECONDS.toMillis(10));
+            for (Map.Entry<String, SubscriptionGroupConfig> entry : sub.getSubscriptionGroupTable().entrySet()) {
+                ensureSubConfig(targetBrokers, entry.getValue());
+            }
+        } catch (Exception e) {
+            log.error("syncSubConfig failed", e);
+        }
+    }
+
+    private void ensureSubConfig(Collection<String> targetBrokers,
+        SubscriptionGroupConfig subConfig) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+        for (String addr : targetBrokers) {
+            this.targetMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subConfig);
         }
     }
 
@@ -181,7 +210,7 @@ public class RmqMetaReplicator extends SourceConnector {
         String[] addrs = clusterInfo.retrieveAllAddrByCluster(this.replicatorConfig.getSrcCluster());
         for (String addr : addrs) {
             ConsumeStatsList stats = this.srcMQAdminExt.fetchConsumeStatsInBroker(addr, true, 3 * 1000);
-            stats.getConsumeStatsList().stream().map(kv -> kv.keySet()).forEach(groups::addAll);
+            stats.getConsumeStatsList().stream().map(Map::keySet).forEach(groups::addAll);
         }
         return groups;
     }
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 4134a2a..60687d7 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -22,12 +22,15 @@ import io.openmessaging.internal.DefaultKeyValue;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.protocol.route.BrokerData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.replicator.config.DataType;
 import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
@@ -140,4 +143,26 @@ public class Utils {
 
         return result;
     }
+
+    public static ImmutablePair<DefaultMQAdminExt, DefaultMQAdminExt> startMQAdminTools(
+        RmqConnectorConfig replicatorConfig) throws MQClientException {
+        RPCHook rpcHook = null;
+        DefaultMQAdminExt srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        srcMQAdminExt.setNamesrvAddr(replicatorConfig.getSrcNamesrvs());
+        srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+        srcMQAdminExt.setInstanceName(Utils.createInstanceName(replicatorConfig.getSrcNamesrvs()));
+
+        DefaultMQAdminExt targetMQAdminExt = new DefaultMQAdminExt(rpcHook);
+        targetMQAdminExt.setNamesrvAddr(replicatorConfig.getTargetNamesrvs());
+        targetMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+        targetMQAdminExt.setInstanceName(Utils.createInstanceName(replicatorConfig.getTargetNamesrvs()));
+
+        srcMQAdminExt.start();
+        log.info("RocketMQ srcMQAdminExt started");
+
+        targetMQAdminExt.start();
+        log.info("RocketMQ targetMQAdminExt started");
+
+        return ImmutablePair.of(srcMQAdminExt, targetMQAdminExt);
+    }
 }