You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/05/06 02:09:07 UTC
[rocketmq-connect] branch master updated: [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic. (#52)
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git
The following commit(s) were added to refs/heads/master by this push:
new 8fd3c6c [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic. (#52)
8fd3c6c is described below
commit 8fd3c6c4c9793b5fc158ee4505862a1e96a799a6
Author: sunxi92 <su...@163.com>
AuthorDate: Fri May 6 10:09:01 2022 +0800
[ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic. (#52)
* Automatically Create clusterStoreTopic connector-cluster-topic.
* 1.Fix some inaccurate exception
2.Add fetchAllTopicList and fetchAllConsumerGroupList method in ConnectUtil
3.Create the following topics during the creation of the ConnectController: connector-cluster-topic、connector-config-topic、connector-offset-topic、connector-position-topic
4.Automatically create consumerGroup:connect-*
* Update the logic whether the topic exists.
---
.../connect/runtime/connectorwrapper/Worker.java | 3 +-
.../service/ClusterManagementServiceImpl.java | 15 +++-
.../service/ConfigManagementServiceImpl.java | 16 +++++
.../service/OffsetManagementServiceImpl.java | 20 ++++++
.../service/PositionManagementServiceImpl.java | 20 ++++++
.../connect/runtime/service/RebalanceImpl.java | 13 +++-
.../connect/runtime/utils/ConnectUtil.java | 83 ++++++++++++++++++++--
7 files changed, 160 insertions(+), 10 deletions(-)
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index e22f969..9d48143 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -434,7 +434,8 @@ public class Worker {
} else if (task instanceof SinkTask) {
log.info("sink task config keyValue is {}", keyValue.getProperties());
DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue);
- if (connectConfig.isAutoCreateGroupEnable()) {
+ Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+ if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
}
TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index 3bf5b47..bebc338 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
@@ -65,9 +66,19 @@ public class ClusterManagementServiceImpl implements ClusterManagementService {
* @param connectConfig
*/
private void prepare(ConnectConfig connectConfig) {
- if (connectConfig.isAutoCreateGroupEnable()) {
- ConnectUtil.createSubGroup(connectConfig, this.defaultMQPullConsumer.getConsumerGroup());
+ String consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
+ Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+ if (!consumerGroupSet.contains(consumerGroup)) {
+ log.info("try to create consumerGroup: {}!", consumerGroup);
+ ConnectUtil.createSubGroup(connectConfig, consumerGroup);
}
+ String clusterStoreTopic = connectConfig.getClusterStoreTopic();
+ if (!ConnectUtil.isTopicExist(connectConfig, clusterStoreTopic)) {
+ log.info("try to create cluster store topic: {}!", clusterStoreTopic);
+ TopicConfig topicConfig = new TopicConfig(clusterStoreTopic, 1, 1, 6);
+ ConnectUtil.createTopic(connectConfig, topicConfig);
+ }
+
}
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 354fca8..adaa5ce 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
@@ -90,6 +91,21 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
new JsonConverter(),
new ListConverter(ConnectKeyValue.class));
this.plugin = plugin;
+ this.prepare(connectConfig);
+ }
+
+ /**
+ * Preparation before startup
+ *
+ * @param connectConfig
+ */
+ private void prepare(ConnectConfig connectConfig) {
+ String configStoreTopic = connectConfig.getConfigStoreTopic();
+ if (!ConnectUtil.isTopicExist(connectConfig, configStoreTopic)) {
+ log.info("try to create config store topic: {}!", configStoreTopic);
+ TopicConfig topicConfig = new TopicConfig(configStoreTopic, 1, 1, 6);
+ ConnectUtil.createTopic(connectConfig, topicConfig);
+ }
}
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
index 20d80e7..792b24b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
@@ -37,8 +39,11 @@ import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class OffsetManagementServiceImpl implements PositionManagementService {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
/**
* Current offset info in store.
@@ -76,6 +81,21 @@ public class OffsetManagementServiceImpl implements PositionManagementService {
new RecordPositionMapConverter());
this.offsetUpdateListener = new HashSet<>();
this.needSyncPartition = new ConcurrentSet<>();
+ this.prepare(connectConfig);
+ }
+
+ /**
+ * Preparation before startup
+ *
+ * @param connectConfig
+ */
+ private void prepare(ConnectConfig connectConfig) {
+ String offsetStoreTopic = connectConfig.getOffsetStoreTopic();
+ if (!ConnectUtil.isTopicExist(connectConfig, offsetStoreTopic)) {
+ log.info("try to create offset store topic: {}!", offsetStoreTopic);
+ TopicConfig topicConfig = new TopicConfig(offsetStoreTopic, 1, 1, 6);
+ ConnectUtil.createTopic(connectConfig, topicConfig);
+ }
}
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index e8c0eb6..2c2c217 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
@@ -37,8 +39,11 @@ import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class PositionManagementServiceImpl implements PositionManagementService {
+ private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
/**
* Current position info in store.
@@ -75,6 +80,21 @@ public class PositionManagementServiceImpl implements PositionManagementService
new RecordPositionMapConverter());
this.positionUpdateListener = new HashSet<>();
this.needSyncPartition = new ConcurrentSet<>();
+ this.prepare(connectConfig);
+ }
+
+ /**
+ * Preparation before startup
+ *
+ * @param connectConfig
+ */
+ private void prepare(ConnectConfig connectConfig) {
+ String positionStoreTopic = connectConfig.getPositionStoreTopic();
+ if (!ConnectUtil.isTopicExist(connectConfig, positionStoreTopic)) {
+ log.info("try to create position store topic: {}!", positionStoreTopic);
+ TopicConfig topicConfig = new TopicConfig(positionStoreTopic, 1, 1, 6);
+ ConnectUtil.createTopic(connectConfig, topicConfig);
+ }
}
@Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 5f867ac..8b1ed6d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -19,12 +19,15 @@ package org.apache.rocketmq.connect.runtime.service;
import java.util.List;
import java.util.Map;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.connect.runtime.ConnectController;
import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +72,11 @@ public class RebalanceImpl {
public void checkClusterStoreTopic() {
if (!clusterManagementService.hasClusterStoreTopic()) {
- log.error("cluster store topic not exist, apply first please!");
+ ConnectConfig connectConfig = this.connectController.getConnectConfig();
+ String clusterStoreTopic = this.connectController.getConnectConfig().getClusterStoreTopic();
+ log.info("cluster store topic not exist, try to create it!");
+ TopicConfig topicConfig = new TopicConfig(clusterStoreTopic, 1, 1, 6);
+ ConnectUtil.createTopic(connectConfig, topicConfig);
}
}
@@ -78,7 +85,9 @@ public class RebalanceImpl {
*/
public void doRebalance() {
List<String> curAliveWorkers = clusterManagementService.getAllAliveWorkers();
- log.info("Current Alive workers : " + curAliveWorkers.size());
+ if (curAliveWorkers != null) {
+ log.info("Current Alive workers : " + curAliveWorkers.size());
+ }
Map<String, ConnectKeyValue> curConnectorConfigs = configManagementService.getConnectorConfigs();
log.info("Current ConnectorConfigs : " + curConnectorConfigs);
Map<String, List<ConnectKeyValue>> curTaskConfigs = configManagementService.getTaskConfigs();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 7013110..b7f9c5f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.connect.runtime.utils;
+import com.beust.jcommander.internal.Sets;
import io.openmessaging.connector.api.data.RecordOffset;
import io.openmessaging.connector.api.data.RecordPartition;
import java.util.ArrayList;
@@ -34,9 +35,14 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -154,19 +160,86 @@ public class ConnectUtil {
return defaultMQAdminExt;
}
+ public static void createTopic(ConnectConfig connectConfig, TopicConfig topicConfig) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set<String> clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("create topic: " + topicConfig.getTopicName() + " failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ }
+
+ public static boolean isTopicExist(ConnectConfig connectConfig, String topic) {
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ boolean foundTopicRouteInfo = false;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+ if (topicRouteData != null) {
+ foundTopicRouteInfo = true;
+ }
+ } catch (MQClientException e) {
+ foundTopicRouteInfo = false;
+ } catch (Exception e) {
+ throw new RuntimeException("get topic route info failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return foundTopicRouteInfo;
+ }
+
+ public static Set<String> fetchAllConsumerGroupList(ConnectConfig connectConfig) {
+ Set<String> consumerGroupSet = Sets.newHashSet();
+ DefaultMQAdminExt defaultMQAdminExt = null;
+ try {
+ defaultMQAdminExt = startMQAdminTool(connectConfig);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+ SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+ consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("fetch all topic failed", e);
+ } finally {
+ if (defaultMQAdminExt != null) {
+ defaultMQAdminExt.shutdown();
+ }
+ }
+ return consumerGroupSet;
+ }
+
public static String createSubGroup(ConnectConfig connectConfig, String subGroup) {
DefaultMQAdminExt defaultMQAdminExt = null;
try {
defaultMQAdminExt = startMQAdminTool(connectConfig);
SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
initConfig.setGroupName(subGroup);
-
- Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());
- for (String addr : masterSet) {
- defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+ ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+ HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+ Set<String> clusterNameSet = clusterAddrTable.keySet();
+ for (String clusterName : clusterNameSet) {
+ Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+ for (String addr : masterSet) {
+ defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+ }
}
} catch (Exception e) {
- throw new IllegalArgumentException("create subGroup: " + subGroup + " failed", e);
+ throw new RuntimeException("create subGroup: " + subGroup + " failed", e);
} finally {
if (defaultMQAdminExt != null) {
defaultMQAdminExt.shutdown();