You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:39 UTC
[rocketmq-connect] 29/39: [ISSUE 503] Metadata synchronization optimization (#504)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit ae2751eb5d72a643846414f65b7512186f4077ce
Author: zhoubo <87...@qq.com>
AuthorDate: Thu Dec 26 20:54:00 2019 +0800
[ISSUE 503] Metadata synchronization optimization (#504)
* TopicList is null exception and frequent requestTaskReconfiguration
* https://github.com/apache/rocketmq-externals/issues/478
* * Runtime add some log
* Fix replicator add new topic frequent requestTaskReconfiguration bug
* Optimize metadata synchronization and fix RocketMQConverter bug
https://github.com/apache/rocketmq-externals/issues/492
* Fix replicator bug
* Exclude groups starting with RebalanceService
* 1.Metadata synchronization bug fix
2.consumerGroup filters out the ones used in runtime
3.Multiple task consumerGroup name duplication issues
4.Incorrect offsetSyncTopic assignment in task configuration
---
.../java/org/apache/rocketmq/replicator/RmqMetaReplicator.java | 8 +++++---
src/main/java/org/apache/rocketmq/replicator/common/Utils.java | 6 +++---
2 files changed, 8 insertions(+), 6 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 75bc919..856fce8 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -183,12 +183,14 @@ public class RmqMetaReplicator extends SourceConnector {
Collections.shuffle(masters);
Set<String> targetBrokers =
- CommandUtil.fetchMasterAddrByClusterName(this.targetMQAdminExt, replicatorConfig.getSrcCluster());
+ CommandUtil.fetchMasterAddrByClusterName(this.targetMQAdminExt, replicatorConfig.getTargetCluster());
String addr = masters.get(0);
SubscriptionGroupWrapper sub = this.srcMQAdminExt.getAllSubscriptionGroup(addr, TimeUnit.SECONDS.toMillis(10));
for (Map.Entry<String, SubscriptionGroupConfig> entry : sub.getSubscriptionGroupTable().entrySet()) {
- ensureSubConfig(targetBrokers, entry.getValue());
+ if (skipInnerGroup(entry.getKey())) {
+ ensureSubConfig(targetBrokers, entry.getValue());
+ }
}
} catch (Exception e) {
log.error("syncSubConfig failed", e);
@@ -196,7 +198,7 @@ public class RmqMetaReplicator extends SourceConnector {
}
private void ensureSubConfig(Collection<String> targetBrokers,
- SubscriptionGroupConfig subConfig) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
+ SubscriptionGroupConfig subConfig) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
for (String addr : targetBrokers) {
this.targetMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, subConfig);
}
diff --git a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
index 60687d7..9ccee41 100644
--- a/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
+++ b/src/main/java/org/apache/rocketmq/replicator/common/Utils.java
@@ -22,8 +22,8 @@ import io.openmessaging.internal.DefaultKeyValue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -44,7 +44,7 @@ public class Utils {
private static final Logger log = LoggerFactory.getLogger(Utils.class);
public static String createGroupName(String prefix) {
- return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).toString();
+ return new StringBuilder().append(prefix).append("-").append(System.currentTimeMillis()).append("-").append(ThreadLocalRandom.current().nextInt()).toString();
}
public static String createGroupName(String prefix, String postfix) {
@@ -132,7 +132,7 @@ public class Utils {
keyValue.put(TaskConfigEnum.TASK_STORE_ROCKETMQ.getKey(), tdc.getStoreTopic());
keyValue.put(TaskConfigEnum.TASK_SOURCE_ROCKETMQ.getKey(), tdc.getSrcNamesrvs());
keyValue.put(TaskConfigEnum.TASK_SOURCE_CLUSTER.getKey(), tdc.getSrcCluster());
- keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getSrcCluster());
+ keyValue.put(TaskConfigEnum.TASK_OFFSET_SYNC_TOPIC.getKey(), tdc.getOffsetSyncTopic());
keyValue.put(TaskConfigEnum.TASK_DATA_TYPE.getKey(), DataType.OFFSET.ordinal());
keyValue.put(TaskConfigEnum.TASK_GROUP_INFO.getKey(), JSONObject.toJSONString(groupList));
keyValue.put(TaskConfigEnum.TASK_SOURCE_RECORD_CONVERTER.getKey(), tdc.getConverter());