You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/04/07 13:03:09 UTC

[GitHub] [rocketmq-connect] sunxi92 opened a new pull request, #52: Automatically create clusterStoreTopic connector-cluster-topic.

sunxi92 opened a new pull request, #52:
URL: https://github.com/apache/rocketmq-connect/pull/52

   #51
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou merged pull request #52: [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic.

Posted by GitBox <gi...@apache.org>.
odbozhou merged PR #52:
URL: https://github.com/apache/rocketmq-connect/pull/52


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #52: [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic.

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #52:
URL: https://github.com/apache/rocketmq-connect/pull/52#discussion_r860518698


##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java:
##########
@@ -65,9 +66,20 @@ public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
      * @param connectConfig
      */
     private void prepare(ConnectConfig connectConfig) {
-        if (connectConfig.isAutoCreateGroupEnable()) {
-            ConnectUtil.createSubGroup(connectConfig, this.defaultMQPullConsumer.getConsumerGroup());
+        String consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
+        Set<String> topicSet = ConnectUtil.fetchAllTopicList(connectConfig);

Review Comment:
   What I mean is just to check if a single topic exists, no need to get the whole list of topics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #52: Automatically create clusterStoreTopic connector-cluster-topic.

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #52:
URL: https://github.com/apache/rocketmq-connect/pull/52#discussion_r846989942


##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt startMQAdminTool(ConnectConfig connectConfig) th
         return defaultMQAdminExt;
     }
 
+    public static void createTopic(ConnectConfig connectConfig, TopicConfig topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());
+            for (String addr : masterSet) {
+                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("create topic: " + topicConfig.getTopicName() + " failed", e);
+        } finally {
+            if (defaultMQAdminExt != null) {
+                defaultMQAdminExt.shutdown();
+            }
+        }
+    }
+
+    public static boolean isAutoCreateTopic(ConnectConfig connectConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        boolean autoCreateTopic = true;
+
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());
+            for (String addr : masterSet) {
+                Properties brokerConfig = defaultMQAdminExt.getBrokerConfig(addr);
+                autoCreateTopic = autoCreateTopic && Boolean.parseBoolean(brokerConfig.getProperty("autoCreateTopicEnable"));
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("get broker config autoCreateTopicEnable failed", e);

Review Comment:
   IllegalArgumentException may be inaccurate



##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/RebalanceImpl.java:
##########
@@ -69,7 +71,15 @@ public RebalanceImpl(Worker worker, ConfigManagementService configManagementServ
 
     public void checkClusterStoreTopic() {
         if (!clusterManagementService.hasClusterStoreTopic()) {
-            log.error("cluster store topic not exist, apply first please!");
+            if (ConnectUtil.isAutoCreateTopic(this.connectController.getConnectConfig())) {
+                TopicConfig topicConfig = new TopicConfig(this.connectController.getConnectConfig().getClusterStoreTopic(), 1, 1, 6);

Review Comment:
   Better to define constants instead of using magic values



##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt startMQAdminTool(ConnectConfig connectConfig) th
         return defaultMQAdminExt;
     }
 
+    public static void createTopic(ConnectConfig connectConfig, TopicConfig topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());
+            for (String addr : masterSet) {
+                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+            }
+        } catch (Exception e) {
+            throw new IllegalArgumentException("create topic: " + topicConfig.getTopicName() + " failed", e);

Review Comment:
   IllegalArgumentException may be inaccurate



##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/utils/ConnectUtil.java:
##########
@@ -154,6 +156,45 @@ public static DefaultMQAdminExt startMQAdminTool(ConnectConfig connectConfig) th
         return defaultMQAdminExt;
     }
 
+    public static void createTopic(ConnectConfig connectConfig, TopicConfig topicConfig) {
+        DefaultMQAdminExt defaultMQAdminExt = null;
+        try {
+            defaultMQAdminExt = startMQAdminTool(connectConfig);
+
+            Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, connectConfig.getClusterName());

Review Comment:
   When a topic is automatically created, is the clusterName required in the connect config, and can it be obtained through the name server?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] odbozhou commented on a diff in pull request #52: [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic.

Posted by GitBox <gi...@apache.org>.
odbozhou commented on code in PR #52:
URL: https://github.com/apache/rocketmq-connect/pull/52#discussion_r857075511


##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java:
##########
@@ -65,9 +66,20 @@ public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
      * @param connectConfig
      */
     private void prepare(ConnectConfig connectConfig) {
-        if (connectConfig.isAutoCreateGroupEnable()) {
-            ConnectUtil.createSubGroup(connectConfig, this.defaultMQPullConsumer.getConsumerGroup());
+        String consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
+        Set<String> topicSet = ConnectUtil.fetchAllTopicList(connectConfig);

Review Comment:
   Is it possible to just check whether a single topic exists?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [rocketmq-connect] sunxi92 commented on a diff in pull request #52: [ISSUES #51] Automatically create clusterStoreTopic connector-cluster-topic.

Posted by GitBox <gi...@apache.org>.
sunxi92 commented on code in PR #52:
URL: https://github.com/apache/rocketmq-connect/pull/52#discussion_r859801890


##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/service/ClusterManagementServiceImpl.java:
##########
@@ -65,9 +66,20 @@ public ClusterManagementServiceImpl(ConnectConfig connectConfig) {
      * @param connectConfig
      */
     private void prepare(ConnectConfig connectConfig) {
-        if (connectConfig.isAutoCreateGroupEnable()) {
-            ConnectUtil.createSubGroup(connectConfig, this.defaultMQPullConsumer.getConsumerGroup());
+        String consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
+        Set<String> topicSet = ConnectUtil.fetchAllTopicList(connectConfig);

Review Comment:
   > Is it possible to just check whether a single topic exists?
   
   A ConnectController is created when the Runtime is started.When ConnectController created, the ClusterManagementServiceImpl, ConfigManagementServiceImpl, PositionManagementServiceImpl and OffsetMan AgementServiceImpl are created, in the process of creating the prepare method will check whether the corresponding topic exist if not then create it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org