You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/03/02 03:39:38 UTC
[rocketmq-connect] 28/39: [ISSUE #492] Optimize metadata synchronization and fix RocketMQConverter bug (#493)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
commit 070dd98cf2492fd760dc2d34b1f18057e1b2a56f
Author: zhoubo <87...@qq.com>
AuthorDate: Thu Dec 26 11:07:45 2019 +0800
[ISSUE #492] Optimize metadata synchronization and fix RocketMQConverter bug (#493)
* TopicList is null exception and frequent requestTaskReconfiguration
* https://github.com/apache/rocketmq-externals/issues/478
* * Runtime add some log
* Fix replicator add new topic frequent requestTaskReconfiguration bug
* Optimize metadata synchronization and fix RocketMQConverter bug
https://github.com/apache/rocketmq-externals/issues/492
* Fix replicator bug
* Exclude groups starting with RebalanceService
---
.../org/apache/rocketmq/replicator/MetaSourceTask.java | 1 +
.../apache/rocketmq/replicator/RmqMetaReplicator.java | 4 +++-
.../apache/rocketmq/replicator/RmqSourceReplicator.java | 16 +++++++++-------
3 files changed, 13 insertions(+), 8 deletions(-)
diff --git a/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
index 16bf464..f28078f 100644
--- a/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
+++ b/src/main/java/org/apache/rocketmq/replicator/MetaSourceTask.java
@@ -77,6 +77,7 @@ public class MetaSourceTask extends SourceTask {
if (started) {
started = false;
}
+ srcMQAdminExt.shutdown();
}
@Override public void pause() {
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
index 3b00de5..75bc919 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqMetaReplicator.java
@@ -106,6 +106,8 @@ public class RmqMetaReplicator extends SourceConnector {
@Override public void stop() {
log.info("stopping...");
this.executor.shutdown();
+ this.srcMQAdminExt.shutdown();
+ this.targetMQAdminExt.shutdown();
}
@Override public void pause() {
@@ -217,7 +219,7 @@ public class RmqMetaReplicator extends SourceConnector {
private boolean skipInnerGroup(String group) {
if (INNER_CONSUMER_GROUPS.contains(group) || group.startsWith("CID_RMQ_SYS_") || group.startsWith("PositionManage") ||
- group.startsWith("ConfigManage") || group.startsWith("OffsetManage")) {
+ group.startsWith("ConfigManage") || group.startsWith("OffsetManage") || group.startsWith("DefaultConnectCluster") || group.startsWith("RebalanceService")) {
return false;
}
return true;
diff --git a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
index 82744ed..10efae6 100644
--- a/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
+++ b/src/main/java/org/apache/rocketmq/replicator/RmqSourceReplicator.java
@@ -242,7 +242,7 @@ public class RmqSourceReplicator extends SourceConnector {
Matcher matcher = pattern.matcher(topic);
if (matcher.matches()) {
String targetTopic = generateTargetTopic(topic);
- if (!targetTopicSet.contains(topic)) {
+ if (!targetTopicSet.contains(targetTopic)) {
ensureTargetTopic(topic, targetTopic);
}
@@ -273,8 +273,6 @@ public class RmqSourceReplicator extends SourceConnector {
}
} catch (Exception e) {
log.error("Fetch topic list error.", e);
- } finally {
- srcMQAdminExt.shutdown();
}
}
@@ -309,12 +307,16 @@ public class RmqSourceReplicator extends SourceConnector {
throw new IllegalStateException(String.format("no broker found for srcTopic: %s srcCluster: %s", srcTopic, srcCluster));
}
- String brokerAddr = brokerList.get(0).selectBrokerAddr();
- TopicConfig topicConfig = this.srcMQAdminExt.examineTopicConfig(brokerAddr, srcTopic);
+ final TopicRouteData topicRouteData = this.srcMQAdminExt.examineTopicRouteInfo(srcTopic);
+ final TopicConfig topicConfig = new TopicConfig();
+ final List<QueueData> queueDatas = topicRouteData.getQueueDatas();
+ QueueData queueData = queueDatas.get(0);
+ topicConfig.setPerm(queueData.getPerm());
+ topicConfig.setReadQueueNums(queueData.getReadQueueNums());
+ topicConfig.setWriteQueueNums(queueData.getWriteQueueNums());
+ topicConfig.setTopicSysFlag(queueData.getTopicSynFlag());
topicConfig.setTopicName(targetTopic);
Utils.createTopic(this.targetMQAdminExt, topicConfig, targetCluster);
-
- throw new IllegalStateException("");
}
public String generateTargetTopic(String topic) {