You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2022/05/06 02:09:07 UTC

[rocketmq-connect] branch master updated: [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic. (#52)

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-connect.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fd3c6c  [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic. (#52)
8fd3c6c is described below

commit 8fd3c6c4c9793b5fc158ee4505862a1e96a799a6
Author: sunxi92 <su...@163.com>
AuthorDate: Fri May 6 10:09:01 2022 +0800

    [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic. (#52)
    
    * Automatically Create clusterStoreTopic connector-cluster-topic.
    
    * 1.Fix some inaccurate exception
    2.Add fetchAllTopicList and fetchAllConsumerGroupList method in ConnectUtil
    3.Create the following topics during the creation of the ConnectController: connector-cluster-topic、connector-config-topic、connector-offset-topic、connector-position-topic
    4.Automatically create consumerGroup:connect-*
    
    * Update the logic whether the topic exists.
---
 .../connect/runtime/connectorwrapper/Worker.java   |  3 +-
 .../service/ClusterManagementServiceImpl.java      | 15 +++-
 .../service/ConfigManagementServiceImpl.java       | 16 +++++
 .../service/OffsetManagementServiceImpl.java       | 20 ++++++
 .../service/PositionManagementServiceImpl.java     | 20 ++++++
 .../connect/runtime/service/RebalanceImpl.java     | 13 +++-
 .../connect/runtime/utils/ConnectUtil.java         | 83 ++++++++++++++++++++--
 7 files changed, 160 insertions(+), 10 deletions(-)

diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
index e22f969..9d48143 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java
@@ -434,7 +434,8 @@ public class Worker {
                     } else if (task instanceof SinkTask) {
                         log.info("sink task config keyValue is {}", keyValue.getProperties());
                         DefaultMQPullConsumer consumer = ConnectUtil.initDefaultMQPullConsumer(connectConfig, connectorName, keyValue);
-                        if (connectConfig.isAutoCreateGroupEnable()) {
+                        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+                        if (!consumerGroupSet.contains(consumer.getConsumerGroup())) {
                             ConnectUtil.createSubGroup(connectConfig, consumer.getConsumerGroup());
                         }
                         TransformChain<ConnectRecord> transformChain = new TransformChain<>(keyValue, plugin);
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
index 3bf5b47..bebc338 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Set;
 import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
@@ -65,9 +66,19 @@ public class ClusterManagementServiceImpl implements ClusterManagementService {
      * @param connectConfig
      */
     private void prepare(ConnectConfig connectConfig) {
-        if (connectConfig.isAutoCreateGroupEnable()) {
-            ConnectUtil.createSubGroup(connectConfig, this.defaultMQPullConsumer.getConsumerGroup());
+        String consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
+        Set<String> consumerGroupSet = ConnectUtil.fetchAllConsumerGroupList(connectConfig);
+        if (!consumerGroupSet.contains(consumerGroup)) {
+            log.info("try to create consumerGroup: {}!", consumerGroup);
+            ConnectUtil.createSubGroup(connectConfig, consumerGroup);
         }
+        String clusterStoreTopic = connectConfig.getClusterStoreTopic();
+        if (!ConnectUtil.isTopicExist(connectConfig, clusterStoreTopic)) {
+            log.info("try to create cluster store topic: {}!", clusterStoreTopic);
+            TopicConfig topicConfig = new TopicConfig(clusterStoreTopic, 1, 1, 6);
+            ConnectUtil.createTopic(connectConfig, topicConfig);
+        }
+
     }
 
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
index 354fca8..adaa5ce 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ConfigManagementServiceImpl.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
@@ -90,6 +91,21 @@ public class ConfigManagementServiceImpl implements ConfigManagementService {
             new JsonConverter(),
             new ListConverter(ConnectKeyValue.class));
         this.plugin = plugin;
+        this.prepare(connectConfig);
+    }
+
+    /**
+     * Preparation before startup
+     *
+     * @param connectConfig
+     */
+    private void prepare(ConnectConfig connectConfig) {
+        String configStoreTopic = connectConfig.getConfigStoreTopic();
+        if (!ConnectUtil.isTopicExist(connectConfig, configStoreTopic)) {
+            log.info("try to create config store topic: {}!", configStoreTopic);
+            TopicConfig topicConfig = new TopicConfig(configStoreTopic, 1, 1, 6);
+            ConnectUtil.createTopic(connectConfig, topicConfig);
+        }
     }
 
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
index 20d80e7..792b24b 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/OffsetManagementServiceImpl.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
@@ -37,8 +39,11 @@ import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class OffsetManagementServiceImpl implements PositionManagementService {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
     /**
      * Current offset info in store.
@@ -76,6 +81,21 @@ public class OffsetManagementServiceImpl implements PositionManagementService {
             new RecordPositionMapConverter());
         this.offsetUpdateListener = new HashSet<>();
         this.needSyncPartition = new ConcurrentSet<>();
+        this.prepare(connectConfig);
+    }
+
+    /**
+     * Preparation before startup
+     *
+     * @param connectConfig
+     */
+    private void prepare(ConnectConfig connectConfig) {
+        String offsetStoreTopic = connectConfig.getOffsetStoreTopic();
+        if (!ConnectUtil.isTopicExist(connectConfig, offsetStoreTopic)) {
+            log.info("try to create offset store topic: {}!", offsetStoreTopic);
+            TopicConfig topicConfig = new TopicConfig(offsetStoreTopic, 1, 1, 6);
+            ConnectUtil.createTopic(connectConfig, topicConfig);
+        }
     }
 
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
index e8c0eb6..2c2c217 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/PositionManagementServiceImpl.java
@@ -25,6 +25,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.connect.runtime.common.LoggerName;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.converter.JsonConverter;
 import org.apache.rocketmq.connect.runtime.converter.RecordOffsetConverter;
@@ -37,8 +39,11 @@ import org.apache.rocketmq.connect.runtime.utils.FilePathConfigUtil;
 import org.apache.rocketmq.connect.runtime.utils.datasync.BrokerBasedLog;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizer;
 import org.apache.rocketmq.connect.runtime.utils.datasync.DataSynchronizerCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class PositionManagementServiceImpl implements PositionManagementService {
+    private static final Logger log = LoggerFactory.getLogger(LoggerName.ROCKETMQ_RUNTIME);
 
     /**
      * Current position info in store.
@@ -75,6 +80,21 @@ public class PositionManagementServiceImpl implements PositionManagementService
             new RecordPositionMapConverter());
         this.positionUpdateListener = new HashSet<>();
         this.needSyncPartition = new ConcurrentSet<>();
+        this.prepare(connectConfig);
+    }
+
+    /**
+     * Preparation before startup
+     *
+     * @param connectConfig
+     */
+    private void prepare(ConnectConfig connectConfig) {
+        String positionStoreTopic = connectConfig.getPositionStoreTopic();
+        if (!ConnectUtil.isTopicExist(connectConfig, positionStoreTopic)) {
+            log.info("try to create position store topic: {}!", positionStoreTopic);
+            TopicConfig topicConfig = new TopicConfig(positionStoreTopic, 1, 1, 6);
+            ConnectUtil.createTopic(connectConfig, topicConfig);
+        }
     }
 
     @Override
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
index 5f867ac..8b1ed6d 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java
@@ -19,12 +19,15 @@ package org.apache.rocketmq.connect.runtime.service;
 
 import java.util.List;
 import java.util.Map;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.connect.runtime.ConnectController;
 import org.apache.rocketmq.connect.runtime.common.ConnAndTaskConfigs;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.common.LoggerName;
+import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
 import org.apache.rocketmq.connect.runtime.connectorwrapper.Worker;
 import org.apache.rocketmq.connect.runtime.service.strategy.AllocateConnAndTaskStrategy;
+import org.apache.rocketmq.connect.runtime.utils.ConnectUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -69,7 +72,11 @@ public class RebalanceImpl {
 
     public void checkClusterStoreTopic() {
         if (!clusterManagementService.hasClusterStoreTopic()) {
-            log.error("cluster store topic not exist, apply first please!");
+            ConnectConfig connectConfig = this.connectController.getConnectConfig();
+            String clusterStoreTopic = this.connectController.getConnectConfig().getClusterStoreTopic();
+            log.info("cluster store topic not exist, try to create it!");
+            TopicConfig topicConfig = new TopicConfig(clusterStoreTopic, 1, 1, 6);
+            ConnectUtil.createTopic(connectConfig, topicConfig);
         }
     }
 
@@ -78,7 +85,9 @@ public class RebalanceImpl {
      */
     public void doRebalance() {
         List<String> curAliveWorkers = clusterManagementService.getAllAliveWorkers();
-        log.info("Current Alive workers : " + curAliveWorkers.size());
+        if (curAliveWorkers != null) {
+            log.info("Current Alive workers : " + curAliveWorkers.size());
+        }
         Map<String, ConnectKeyValue> curConnectorConfigs = configManagementService.getConnectorConfigs();
         log.info("Current ConnectorConfigs : " + curConnectorConfigs);
         Map<String, List<ConnectKeyValue>> curTaskConfigs = configManagementService.getTaskConfigs();
diff --git a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
index 7013110..b7f9c5f 100644
--- a/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
+++ b/rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.rocketmq.connect.runtime.utils;
 
+import com.beust.jcommander.internal.Sets;
 import io.openmessaging.connector.api.data.RecordOffset;
 import io.openmessaging.connector.api.data.RecordPartition;
 import java.util.ArrayList;
@@ -34,9 +35,14 @@ import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.body.ClusterInfo;
+import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
+import org.apache.rocketmq.common.protocol.route.BrokerData;
+import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
 import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
 import org.apache.rocketmq.connect.runtime.config.ConnectConfig;
@@ -154,19 +160,86 @@ public class ConnectUtil {
         return defaultMQAdminExt;
     }
 
+    public static void createTopic(ConnectConfig connectConfig, TopicConfig topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+            Set<String> clusterNameSet = clusterAddrTable.keySet();
+            for (String clusterName : clusterNameSet) {
+                Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+                }
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("create topic: " + topicConfig.getTopicName() + " failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+    }
+
+    public static boolean isTopicExist(ConnectConfig connectConfig, String topic) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        boolean foundTopicRouteInfo = false;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            TopicRouteData topicRouteData = defaultMQAdminExt.examineTopicRouteInfo(topic);
+            if (topicRouteData != null) {
+                foundTopicRouteInfo = true;
+            }
+        } catch (MQClientException e) {
+            foundTopicRouteInfo = false;
+        } catch (Exception e) {
+            throw new RuntimeException("get topic route info  failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+        return foundTopicRouteInfo;
+    }
+
+    public static Set<String> fetchAllConsumerGroupList(ConnectConfig connectConfig) {
+        Set<String> consumerGroupSet = Sets.newHashSet();
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            for (BrokerData brokerData : clusterInfo.getBrokerAddrTable().values()) {
+                SubscriptionGroupWrapper subscriptionGroupWrapper = defaultMQAdminExt.getAllSubscriptionGroup(brokerData.selectBrokerAddr(), 3000L);
+                consumerGroupSet.addAll(subscriptionGroupWrapper.getSubscriptionGroupTable().keySet());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("fetch all topic  failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+        return consumerGroupSet;
+    }
+
     public static String createSubGroup(ConnectConfig connectConfig, String subGroup) {
         DefaultMQAdminExt defaultMQAdminExt = null;
         try {
             defaultMQAdminExt = startMQAdminTool(connectConfig);
             SubscriptionGroupConfig initConfig = new SubscriptionGroupConfig();
             initConfig.setGroupName(subGroup);
-
-            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());
-            for (String addr : masterSet) {
-                defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+            ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();
+            HashMap<String, Set<String>> clusterAddrTable = clusterInfo.getClusterAddrTable();
+            Set<String> clusterNameSet = clusterAddrTable.keySet();
+            for (String clusterName : clusterNameSet) {
+                Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
+                for (String addr : masterSet) {
+                    defaultMQAdminExt.createAndUpdateSubscriptionGroupConfig(addr, initConfig);
+                }
             }
         } catch (Exception e) {
-            throw new IllegalArgumentException("create subGroup: " + subGroup + " failed", e);
+            throw new RuntimeException("create subGroup: " + subGroup + " failed", e);
         } finally {
             if (defaultMQAdminExt != null) {
                 defaultMQAdminExt.shutdown();