You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:30 UTC
[rocketmq-connect] 20/39: feat(replicator): Support subcriptionConfig sync
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit d7b0ce8a8739d5c60390e82cb06375c94e54e565
Author: xujianhai666 <ze...@bytedance.com>
AuthorDate: Wed Oct 23 16:09:56 2019 +0800
feat(replicator): Support subcriptionConfig sync
- Add syncSubConfig method on RmqMetaReplicator
Closes #438
---
.../rocketmq/replicator/RmqMetaReplicator.java | 91 ++++++++++++++--------
.../apache/rocketmq/replicator/common/Utils.java | 25 ++++++
2 files changed, 85 insertions(+), 31 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 38e5af2..3b00de5 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -20,26 +20,32 @@ import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.Task;
import io.openmessaging.connector.api.source.SourceConnector;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
-import org.apache.rocketmq.remoting.RPCHook;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
-import org.apache.rocketmq.replicator.common.ConstDefine;
import org.apache.rocketmq.replicator.common.Utils;
import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import org.apache.rocketmq.tools.command.CommandUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +59,7 @@ public class RmqMetaReplicator extends SourceConnector {
private volatile boolean configValid = false;
private Set<String> knownGroups;
private DefaultMQAdminExt srcMQAdminExt;
+ private DefaultMQAdminExt targetMQAdminExt;
private volatile boolean adminStarted;
private ScheduledExecutorService executor;
@@ -92,14 +99,8 @@ public class RmqMetaReplicator extends SourceConnector {
@Override public void start() {
log.info("starting...");
startMQAdminTools();
- executor.scheduleAtFixedRate(() ->
- {
- try {
- refreshConsuemrGroups();
- } catch (Exception e) {
- log.error("refresh consumer groups failed.", e);
- }
- }, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(this::refreshConsumerGroups, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
+ executor.scheduleAtFixedRate(this::syncSubConfig, replicatorConfig.getRefreshInterval(), replicatorConfig.getRefreshInterval(), TimeUnit.SECONDS);
}
@Override public void stop() {
@@ -128,6 +129,7 @@ public class RmqMetaReplicator extends SourceConnector {
startMQAdminTools();
try {
+ this.syncSubConfig();
this.knownGroups = this.fetchConsumerGroups();
} catch (Exception e) {
e.printStackTrace();
@@ -140,34 +142,61 @@ public class RmqMetaReplicator extends SourceConnector {
if (!configValid || adminStarted) {
return;
}
- RPCHook rpcHook = null;
- this.srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
- this.srcMQAdminExt.setNamesrvAddr(this.replicatorConfig.getSrcNamesrvs());
- this.srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
- this.srcMQAdminExt.setInstanceName(Utils.createInstanceName(this.replicatorConfig.getSrcNamesrvs()));
try {
- this.srcMQAdminExt.start();
- log.info("RocketMQ srcMQAdminExt started");
+ ImmutablePair<DefaultMQAdminExt, DefaultMQAdminExt> pair = Utils.startMQAdminTools(this.replicatorConfig);
+ this.srcMQAdminExt = pair.getLeft();
+ this.targetMQAdminExt = pair.getRight();
+ log.info("RocketMQ targetMQAdminExt started");
} catch (MQClientException e) {
log.error("Replicator start failed for `srcMQAdminExt` exception.", e);
}
+
adminStarted = true;
}
- private void refreshConsuemrGroups() throws InterruptedException, RemotingConnectException, MQBrokerException, RemotingTimeoutException, MQClientException, RemotingSendRequestException {
- log.debug("refreshConsuemrGroups...");
- Set<String> groups = fetchConsumerGroups();
- Set<String> newGroups = new HashSet<>();
- Set<String> deadGroups = new HashSet<>();
- newGroups.addAll(groups);
- newGroups.removeAll(knownGroups);
- deadGroups.addAll(knownGroups);
- deadGroups.removeAll(groups);
- if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
- log.info("reconfig consumer groups, new Groups: {} , dead groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
- knownGroups = groups;
- context.requestTaskReconfiguration();
+ private void refreshConsumerGroups() {
+ try {
+ log.debug("refreshConsuemrGroups...");
+ Set<String> groups = fetchConsumerGroups();
+ Set<String> newGroups = new HashSet<>(groups);
+ Set<String> deadGroups = new HashSet<>(knownGroups);
+ newGroups.removeAll(knownGroups);
+ deadGroups.removeAll(groups);
+ if (!newGroups.isEmpty() || !deadGroups.isEmpty()) {
+ log.info("reconfig consumer groups, new Groups: {} , dead groups: {}, previous groups: {}", newGroups, deadGroups, knownGroups);
+ knownGroups = groups;
+ context.requestTaskReconfiguration();
+ }
+ } catch (Exception e) {
+ log.error("refresh consumer groups failed.", e);
+ }
+ }
+
+ private void syncSubConfig() {
+ try {
+ Set<String> masterSet =
+ CommandUtil.fetchMasterAddrByClusterName(this.srcMQAdminExt, replicatorConfig.getSrcCluster());
+ List<String> masters = new ArrayList<>(masterSet);
+ Collections.shuffle(masters);
+
+ Set<String> targetBrokers =
+ CommandUtil.fetchMasterAddrByClusterName(this.targetMQAdminExt, replicatorConfig.getSrcCluster());
+
+ String addr = masters.get(0);
+ SubscriptionGroupWrapper sub = this.srcMQAdminExt.getAllSubscriptionGroup(addr, TimeUnit.SECONDS.toMillis(10));
+ for (Map.Entry<String, SubscriptionGroupConfig> entry : sub.getSubscriptionGroupTable().entrySet()) {
+ ensureSubConfig(targetBrokers, entry.getValue());
+ }
+ } catch (Exception e) {
+ log.error("syncSubConfig failed", e);
+ }
+ }
+
+ private void ensureSubConfig(Collection<String> targetBrokers,
+ SubscriptionGroupConfig subConfig) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ for (String addr : targetBrokers) {
+ this.targetMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subConfig);
}
}
@@ -181,7 +210,7 @@ public class RmqMetaReplicator extends SourceConnector {
String[] addrs = clusterInfo.retrieveAllAddrByCluster(this.replicatorConfig.getSrcCluster());
for (String addr : addrs) {
ConsumeStatsList stats = this.srcMQAdminExt.fetchConsumeStatsInBroker(addr, true, 3 * 1000);
- stats.getConsumeStatsList().stream().map(kv -> kv.keySet()).forEach(groups::addAll);
+ stats.getConsumeStatsList().stream().map(Map::keySet).forEach(groups::addAll);
}
return groups;
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 4134a2a..60687d7 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -22,12 +22,15 @@ import io.openmessaging.internal.DefaultKeyValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.replicator.config.DataType;
import org.apache.rocketmq.replicator.config.RmqConnectorConfig;
@@ -140,4 +143,26 @@ public class Utils {
return result;
}
+
+ public static ImmutablePair<DefaultMQAdminExt, DefaultMQAdminExt> startMQAdminTools(
+ RmqConnectorConfig replicatorConfig) throws MQClientException {
+ RPCHook rpcHook = null;
+ DefaultMQAdminExt srcMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ srcMQAdminExt.setNamesrvAddr(replicatorConfig.getSrcNamesrvs());
+ srcMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ srcMQAdminExt.setInstanceName(Utils.createInstanceName(replicatorConfig.getSrcNamesrvs()));
+
+ DefaultMQAdminExt targetMQAdminExt = new DefaultMQAdminExt(rpcHook);
+ targetMQAdminExt.setNamesrvAddr(replicatorConfig.getTargetNamesrvs());
+ targetMQAdminExt.setAdminExtGroup(Utils.createGroupName(ConstDefine.REPLICATOR_ADMIN_PREFIX));
+ targetMQAdminExt.setInstanceName(Utils.createInstanceName(replicatorConfig.getTargetNamesrvs()));
+
+ srcMQAdminExt.start();
+ log.info("RocketMQ srcMQAdminExt started");
+
+ targetMQAdminExt.start();
+ log.info("RocketMQ targetMQAdminExt started");
+
+ return ImmutablePair.of(srcMQAdminExt, targetMQAdminExt);
+ }
}