You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/10/19 10:47:07 UTC

[GitHub] [inlong] fuweng11 opened a new pull request, #6227: [INLONG-6226][Manager] Add a consumption group for each sink

fuweng11 opened a new pull request, #6227:
URL: https://github.com/apache/inlong/pull/6227

   
   - Fixes #6226 
   
   ### Motivation
   
   Add a consumption group for each sink
   ### Modifications
   
   Add a consumption group for each sink
   ### Verifying this change
   
   *(Please pick either of the following options)*
   
   - [X] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
healchow commented on PR #6227:
URL: https://github.com/apache/inlong/pull/6227#issuecomment-1286826235

   Currently, for TubeMQ, we only create a consumer group at the InlongGroup level, consume a piece of data and write it to various Sinks (such as Hive and ES) under different Streams.
   
   Since the topic of TubeMQ is at the level of InlongGroup, Stream is just a message of different types in the topic (distinguished by the streamId in the message header). Creating only one consumer group can reduce the pressure on the service side of TubeMQ to save a large amount of consumer group information, and also avoid repeated consumption of a large amount of data.
   
   
   当前,对于TubeMQ,我们只在 InlongGroup 层面创建一个消费组,消费1份数据并写到不同的Stream下的各类 Sink (如Hive、ES)中。
   
   由于 TubeMQ 的 Topic 就在 InlongGroup 层面,Stream 只是Topic中的不同类型的消息(由消息头中的streamId来区分),只创建1个消费组就能减少TubeMQ的服务端保存大量消费组信息的压力,也能避免重复消费大量的数据。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001287844


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##########
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
         }
 
         try {
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (streamInfoList == null || streamInfoList.isEmpty()) {
+                log.warn("skip to create tube topic as no streams for groupId={}", groupId);

Review Comment:
   create tube topic? not `tubemq consumer group`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] vernedeng commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
vernedeng commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001310721


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##########
@@ -67,23 +83,31 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
 
         try {
             // 1. create tubemq topic
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (CollectionUtils.isEmpty(streamInfoList)) {
+                log.warn("skip to create tubemq topic as no streams for groupId={}", groupId);
+                return;
+            }
             String clusterTag = groupInfo.getInlongClusterTag();
             TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
             String topicName = groupInfo.getMqResource();
             tubeMQOperator.createTopic(tubeCluster, topicName, operator);
             log.info("success to create tubemq topic for groupId={}", groupId);
 
             // 2. create tubemq consumer group
-            // consumer naming rules: clusterTag_topicName_consumer_group
-            String consumeGroup = clusterTag + "_" + topicName + "_consumer_group";
-            tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
-            log.info("success to create tubemq consumer group for groupId={}", groupId);
-
-            // insert the consumer group info
-            Integer id = consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
-            log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
-                    id, consumeGroup, groupId, topicName);
-
+            for (InlongStreamBriefInfo stream : streamInfoList) {
+                List<StreamSink> streamSinks = sinkService.listSink(groupId, stream.getInlongStreamId());
+                for (StreamSink sink : streamSinks) {
+                    String consumeGroup = String.format(TUBEMQ_CONSUMER_GROUP, clusterTag, topicName, sink.getId());
+                    tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
+                    log.info("success to create tubemq consumer group for groupId={}", groupId);

Review Comment:
   it's seem like consumer group for inlong stream, instead of group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001287844


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##########
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
         }
 
         try {
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (streamInfoList == null || streamInfoList.isEmpty()) {
+                log.warn("skip to create tube topic as no streams for groupId={}", groupId);

Review Comment:
   create tube topic? not consumer group?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow closed pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
healchow closed pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink
URL: https://github.com/apache/inlong/pull/6227


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001288781


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##########
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
         }
 
         try {
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (streamInfoList == null || streamInfoList.isEmpty()) {
+                log.warn("skip to create tube topic as no streams for groupId={}", groupId);
+                return;
+            }
             // 1. create tubemq topic
             String clusterTag = groupInfo.getInlongClusterTag();
             TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
             String topicName = groupInfo.getMqResource();
             tubeMQOperator.createTopic(tubeCluster, topicName, operator);
             log.info("success to create tubemq topic for groupId={}", groupId);
-
-            // 2. create tubemq consumer group
-            // consumer naming rules: clusterTag_topicName_consumer_group
-            String consumeGroup = clusterTag + "_" + topicName + "_consumer_group";
-            tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
-            log.info("success to create tubemq consumer group for groupId={}", groupId);
-
-            // insert the consumer group info
-            Integer id = consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
-            log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
-                    id, consumeGroup, groupId, topicName);
-
+            for (InlongStreamBriefInfo stream : streamInfoList) {
+                List<StreamSink> streamSinks = sinkService.listSink(groupId, stream.getInlongStreamId());
+                for (StreamSink sink : streamSinks) {
+                    // 2. create tubemq consumer group
+                    // consumer naming rules: clusterTag_topicName_consumer_group
+                    String consumeGroup = clusterTag + "_" + topicName + "_" + sink.getId() + "_consumer_group";

Review Comment:
   Suggest extracting a constant, as `PulsarResourceOperator#PULSAR_SUBSCRIPTION` did.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] vernedeng commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
vernedeng commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001310721


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##########
@@ -67,23 +83,31 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
 
         try {
             // 1. create tubemq topic
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (CollectionUtils.isEmpty(streamInfoList)) {
+                log.warn("skip to create tubemq topic as no streams for groupId={}", groupId);
+                return;
+            }
             String clusterTag = groupInfo.getInlongClusterTag();
             TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.TUBEMQ);
             String topicName = groupInfo.getMqResource();
             tubeMQOperator.createTopic(tubeCluster, topicName, operator);
             log.info("success to create tubemq topic for groupId={}", groupId);
 
             // 2. create tubemq consumer group
-            // consumer naming rules: clusterTag_topicName_consumer_group
-            String consumeGroup = clusterTag + "_" + topicName + "_consumer_group";
-            tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
-            log.info("success to create tubemq consumer group for groupId={}", groupId);
-
-            // insert the consumer group info
-            Integer id = consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
-            log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
-                    id, consumeGroup, groupId, topicName);
-
+            for (InlongStreamBriefInfo stream : streamInfoList) {
+                List<StreamSink> streamSinks = sinkService.listSink(groupId, stream.getInlongStreamId());
+                for (StreamSink sink : streamSinks) {
+                    String consumeGroup = String.format(TUBEMQ_CONSUMER_GROUP, clusterTag, topicName, sink.getId());
+                    tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
+                    log.info("success to create tubemq consumer group for groupId={}", groupId);

Review Comment:
   it's seem like consumer group for each inlong stream, instead of inlong group



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6227: [INLONG-6226][Manager] Add a consumer group of TubeMQ for each sink

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6227:
URL: https://github.com/apache/inlong/pull/6227#discussion_r1001287655


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java:
##########
@@ -66,24 +76,32 @@ public void createQueueForGroup(InlongGroupInfo groupInfo, String operator) {
         }
 
         try {
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (streamInfoList == null || streamInfoList.isEmpty()) {

Review Comment:
   Suggest using `CollectionUtils.isEmpty()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org