You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/28 03:02:30 UTC

[GitHub] [inlong] fuweng11 opened a new pull request, #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

fuweng11 opened a new pull request, #6046:
URL: https://github.com/apache/inlong/pull/6046

   
   ### Prepare a Pull Request
   - Fixes #6044 
   
   ### Motivation
   
   Distinguish between group and stream configuration processes
   ### Modifications
   
   Distinguish the configuration process of group and stream. The configuration process of group only configures the relevant information of group, and puts the configuration information of stream into the configuration process of stream.
   
   ### Verifying this change
   
   Distinguish the configuration process of group and stream. The configuration process of group only configures the relevant information of group, and puts the configuration information of stream into the configuration process of stream.
   
   
   - [X] This change is a trivial rework/code cleanup without any test coverage.
   
   - [ ] This change is already covered by existing tests, such as:
     *(please describe tests)*
   
   - [ ] This change added tests and can be verified as follows:
   
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r984376910


##########
inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql:
##########
@@ -637,6 +637,7 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
     `type`            varchar(256)          DEFAULT NULL COMMENT 'Process classification',
     `title`           varchar(256)          DEFAULT NULL COMMENT 'Process title',
     `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+    `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id: to facilitate related inlong stream',

Review Comment:
   ```suggestion
       `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
       `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018583409


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java:
##########
@@ -85,22 +77,12 @@ public WorkflowProcess defineProcess() {
         deleteMQTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteMQTask);
 
-        // Delete Sort

Review Comment:
   The current sort configuration belongs to the group. The sort configuration will not be performed until the stream configuration under the current group is successful.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r996413174


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java:
##########
@@ -75,7 +84,14 @@ public ListenerResult listen(WorkflowContext context) {
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         groupService.update(groupInfo.genRequest(), operator);
-
+        if (Objects.equals(operateType, GroupOperateType.DELETE)) {

Review Comment:
   should we allow deleting a group directly while there are streams under it ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] chestnut-c commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
chestnut-c commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018684017


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,39 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);

Review Comment:
   Waiting for the result in the for loop is equivalent to synchronization. Why use the asynchronous method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018691530


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,39 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);

Review Comment:
   Here we need to wait for the stream configuration to complete before configuring the next stream. Otherwise, an exception is thrown and the next stream configuration is not performed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r983393423


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java:
##########
@@ -84,12 +87,9 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         updateGroupRequest.setVersion(existGroup.getVersion());
         groupService.update(updateGroupRequest, operator);
 
-        // update status of other related configs
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {

Review Comment:
   this condition branch seems to be missing from the stream side's counterpart



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] vernedeng commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
vernedeng commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018587699


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java:
##########
@@ -67,49 +62,12 @@ public boolean accept(String mqType) {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create kafka resource for groupId={}", groupId);
-
-        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
-        try {
-            // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
-        }
-        log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+        log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
     }
 
     @Override
     public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
-        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
-
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to delete kafka resource for groupId={}", groupId);
-        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        try {
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
-            }
-        } catch (Exception e) {
-            log.error("failed to delete kafka resource for groupId=" + groupId, e);
-            throw new WorkflowListenerException("failed to delete kafka resource: " + e.getMessage());
-        }
-        log.info("success to delete kafka resource for groupId={}, cluster={}", groupId, clusterInfo);
+        log.info("skip to delete kafka topic for groupId={}", groupInfo.getInlongGroupId());

Review Comment:
   why skip delete kafka topic directly?



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java:
##########
@@ -81,6 +82,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("not build sort config for groupId={}, as the stream is empty", groupId);

Review Comment:
   
   ```suggestion
               LOGGER.warn("do not build sort config for groupId={}, as the stream is empty", groupId);
   ```



##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,45 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);
+                /*WorkflowResult result = future.get(180, TimeUnit.SECONDS);
+                List<TaskResponse> tasks = result.getNewTasks();
+                if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                    log.error(errMsg);
+                    throw new WorkflowListenerException(errMsg);
+                }*/

Review Comment:
   plsase remove these commented code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018697052


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,39 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);

Review Comment:
   This is because we need to call the stream workflow to configure. If we add a stream later, we do not need to configure the group workflow, but directly execute the stream workflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r984376651


##########
inlong-manager/manager-web/sql/apache_inlong_manager.sql:
##########
@@ -675,6 +675,7 @@ CREATE TABLE IF NOT EXISTS `workflow_process`
     `type`            varchar(256)          DEFAULT NULL COMMENT 'Process classification',
     `title`           varchar(256)          DEFAULT NULL COMMENT 'Process title',
     `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id: to facilitate related inlong group',
+    `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id: to facilitate related inlong stream',

Review Comment:
   ```suggestion
       `inlong_group_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong group id to which this process belongs',
       `inlong_stream_id`varchar(256)          DEFAULT NULL COMMENT 'Inlong stream id to which this process belongs',
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r984354127


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java:
##########
@@ -84,12 +81,9 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         updateGroupRequest.setVersion(existGroup.getVersion());
         groupService.update(updateGroupRequest, operator);
 
-        // update status of other related configs
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-            sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
-        } else {
-            sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        List<InlongStreamInfo> streamList = form.getStreamInfos();
+        for (InlongStreamInfo streamInfo : streamList) {
+            streamProcessOperation.startProcess(groupId, streamInfo.getInlongStreamId(), operator, false);

Review Comment:
   Will the for loop take a long time? If yes, it is recommended to make it asynchronous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r984410336


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java:
##########
@@ -84,12 +81,9 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         updateGroupRequest.setVersion(existGroup.getVersion());
         groupService.update(updateGroupRequest, operator);
 
-        // update status of other related configs
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-            sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
-        } else {
-            sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        List<InlongStreamInfo> streamList = form.getStreamInfos();
+        for (InlongStreamInfo streamInfo : streamList) {
+            streamProcessOperation.startProcess(groupId, streamInfo.getInlongStreamId(), operator, false);

Review Comment:
   Yes,the `false` parameter is expressed in the method `streamProcessOperation.startProcess`
   asynchronous



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] chestnut-c commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
chestnut-c commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018693753


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,39 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);

Review Comment:
   Why not just use the synchronous method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow merged pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
healchow merged PR #6046:
URL: https://github.com/apache/inlong/pull/6046


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018602157


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java:
##########
@@ -67,49 +62,12 @@ public boolean accept(String mqType) {
 
     @Override
     public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create kafka resource for groupId={}", groupId);
-
-        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
-        try {
-            // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getMqResource());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
-        }
-        log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+        log.info("skip to create kafka topic for groupId={}", groupInfo.getInlongGroupId());
     }
 
     @Override
     public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
-        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
-
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to delete kafka resource for groupId={}", groupId);
-        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        try {
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
-            }
-        } catch (Exception e) {
-            log.error("failed to delete kafka resource for groupId=" + groupId, e);
-            throw new WorkflowListenerException("failed to delete kafka resource: " + e.getMessage());
-        }
-        log.info("success to delete kafka resource for groupId={}, cluster={}", groupId, clusterInfo);
+        log.info("skip to delete kafka topic for groupId={}", groupInfo.getInlongGroupId());

Review Comment:
   I fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018600385


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/sort/SortConfigListener.java:
##########
@@ -81,6 +82,10 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         List<InlongStreamInfo> streamInfos = form.getStreamInfos();
+        if (CollectionUtils.isEmpty(streamInfos)) {
+            LOGGER.warn("not build sort config for groupId={}, as the stream is empty", groupId);

Review Comment:
   Not necessary.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] healchow commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
healchow commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r993346660


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java:
##########
@@ -85,22 +77,12 @@ public WorkflowProcess defineProcess() {
         deleteMQTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteMQTask);
 
-        // Delete Sort

Review Comment:
   When do we need to delete the Sort task?
   
   As far as I know, a Sort task may process data of multiple Pairs<StreamSource, StreamSink>. To modify any StreamSource or StreamSink, we stop the Sort task first, push all updated and non-updated StreamSource or StreamSink information to the Sort task, and finally restart it.
   
   In this case, how to maintain the Sort task?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r996413174


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/UpdateGroupCompleteListener.java:
##########
@@ -75,7 +84,14 @@ public ListenerResult listen(WorkflowContext context) {
         }
         InlongGroupInfo groupInfo = form.getGroupInfo();
         groupService.update(groupInfo.genRequest(), operator);
-
+        if (Objects.equals(operateType, GroupOperateType.DELETE)) {

Review Comment:
   should we allow deleting a group directly while there are streams under it ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] woofyzhao commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
woofyzhao commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r996962818


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java:
##########
@@ -85,22 +77,12 @@ public WorkflowProcess defineProcess() {
         deleteMQTask.setListenerFactory(groupTaskListenerFactory);
         process.addTask(deleteMQTask);
 
-        // Delete Sort

Review Comment:
   The sort job id is recorded at the stream level. In the same group each stream should have a unique sort job id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018604107


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,45 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);
+                /*WorkflowResult result = future.get(180, TimeUnit.SECONDS);
+                List<TaskResponse> tasks = result.getNewTasks();
+                if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                    log.error(errMsg);
+                    throw new WorkflowListenerException(errMsg);
+                }*/

Review Comment:
   I fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [inlong] fuweng11 commented on a diff in pull request #6046: [INLONG-6044][Manager] Distinguish between group and stream configuration processes

Posted by GitBox <gi...@apache.org>.
fuweng11 commented on code in PR #6046:
URL: https://github.com/apache/inlong/pull/6046#discussion_r1018697052


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/queue/QueueResourceListener.java:
##########
@@ -96,4 +128,39 @@ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerExc
         return ListenerResult.success("success");
     }
 
+    private void createQueueForStreams(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfos, String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("success to start stream process for groupId={}", groupId);
+
+        for (InlongStreamInfo stream : streamInfos) {
+            StreamResourceProcessForm form = StreamResourceProcessForm.getProcessForm(groupInfo, stream, INIT);
+            String streamId = stream.getInlongStreamId();
+            final String errMsg = "failed to start stream process for groupId=" + groupId + " streamId=" + streamId;
+
+            CompletableFuture<WorkflowResult> future = CompletableFuture
+                    .supplyAsync(() -> workflowService.start(CREATE_STREAM_RESOURCE, operator, form), EXECUTOR_SERVICE)
+                    .whenComplete((result, ex) -> {
+                        if (ex != null) {
+                            log.error(errMsg + ": " + ex.getMessage());
+                            throw new WorkflowListenerException(errMsg, ex);
+                        } else {
+                            List<TaskResponse> tasks = result.getNewTasks();
+                            if (TaskStatus.FAILED == tasks.get(tasks.size() - 1).getStatus()) {
+                                log.error(errMsg);
+                                throw new WorkflowListenerException(errMsg);
+                            }
+                        }
+                    });
+            try {
+                future.get(180, TimeUnit.SECONDS);

Review Comment:
   This is because you need to call the stream workflow to configure. If you add a stream later, you do not need to configure the group workflow, but directly execute the stream workflow.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org