You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/04 07:16:25 UTC

[incubator-inlong] branch master updated: [INLONG-2892][Manager] Update status of StreamSource after approving the InlongGroup or InlongStream (#2893)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new a6eaee8  [INLONG-2892][Manager] Update status of StreamSource after approving the InlongGroup or InlongStream (#2893)
a6eaee8 is described below

commit a6eaee85895d44bf41a7990ca03fd74d788c4e56
Author: healchow <he...@gmail.com>
AuthorDate: Fri Mar 4 15:16:20 2022 +0800

    [INLONG-2892][Manager] Update status of StreamSource after approving the InlongGroup or InlongStream (#2893)
---
 .../service/core/impl/InlongStreamServiceImpl.java      |  6 ++----
 .../group/listener/GroupCompleteProcessListener.java    | 15 +++++++++++----
 .../workflow/stream/StreamCompleteProcessListener.java  | 17 +++++++++++------
 3 files changed, 24 insertions(+), 14 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
index 18afdea..ef6ba02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongStreamServiceImpl.java
@@ -599,17 +599,15 @@ public class InlongStreamServiceImpl implements InlongStreamService {
 
         String groupId = null;
         for (InlongStreamApproveRequest info : streamApproveList) {
-            // Modify mqResourceObj
+            // Modify the inlong stream info after approve
             InlongStreamEntity streamEntity = new InlongStreamEntity();
             groupId = info.getInlongGroupId(); // these groupIds are all the same
             streamEntity.setInlongGroupId(groupId);
             streamEntity.setInlongStreamId(info.getInlongStreamId());
-            // Update status to [STREAM_CONFIG_ING]
             streamEntity.setStatus(EntityStatus.STREAM_CONFIG_ING.getCode());
             streamMapper.updateByIdentifierSelective(streamEntity);
-            // If you need to change inlong stream info after approve, just do in here
 
-            // Modify the sink information
+            // Modify the sink info after approve, such as update cluster info
             sinkService.updateAfterApprove(info.getSinkList(), operator);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java
index cde992d..b08a76b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java
@@ -20,12 +20,14 @@ package org.apache.inlong.manager.service.workflow.group.listener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.SourceState;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -45,6 +47,8 @@ public class GroupCompleteProcessListener implements ProcessEventListener {
     @Autowired
     private InlongStreamService streamService;
     @Autowired
+    private StreamSourceService sourceService;
+    @Autowired
     private SourceFileDetailEntityMapper fileDetailMapper;
 
     @Override
@@ -62,20 +66,23 @@ public class GroupCompleteProcessListener implements ProcessEventListener {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
 
         String groupId = form.getInlongGroupId();
-        String username = context.getApplicant();
+        String applicant = context.getApplicant();
 
         InlongGroupInfo groupInfo = form.getGroupInfo();
         groupInfo.setStatus(GroupState.GROUP_CONFIG_SUCCESSFUL.getCode());
 
         // Update inlong group
-        groupService.update(groupInfo.genRequest(), username);
+        groupService.update(groupInfo.genRequest(), applicant);
         // Update inlong stream status
-        streamService.updateStatus(groupId, null, EntityStatus.STREAM_CONFIG_SUCCESSFUL.getCode(), username);
+        streamService.updateStatus(groupId, null, EntityStatus.STREAM_CONFIG_SUCCESSFUL.getCode(), applicant);
         // Update file data source status
-        fileDetailMapper.updateStatusAfterApprove(groupId, null, EntityStatus.AGENT_ADD.getCode(), username);
+        fileDetailMapper.updateStatusAfterApprove(groupId, null, EntityStatus.AGENT_ADD.getCode(), applicant);
         // TODO Update source db detail status
         // dbDetailMapper.updateStatusAfterApprove(bid, null, EntityStatus.AGENT_WAIT_CREATE.getCode(), username);
 
+        // Update stream source status
+        sourceService.updateStatus(groupId, null, SourceState.TO_BE_ISSUED_ADD.getCode(), applicant);
+
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
index 570356c..6b61f54 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
@@ -19,11 +19,13 @@ package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.SourceState;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.dao.mapper.SourceFileDetailEntityMapper;
 import org.apache.inlong.manager.service.core.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -43,6 +45,8 @@ public class StreamCompleteProcessListener implements ProcessEventListener {
     @Autowired
     private InlongStreamService streamService;
     @Autowired
+    private StreamSourceService sourceService;
+    @Autowired
     private SourceFileDetailEntityMapper fileDetailMapper;
 
     @Override
@@ -51,22 +55,23 @@ public class StreamCompleteProcessListener implements ProcessEventListener {
     }
 
     /**
-     * The creation process ends normally, modify the status of inlong group and all inlong stream
-     * belong to the inlong group to [CONFIG_SUCCESSFUL]
+     * The creation process ends normally, modify the status of inlong group and other related info.
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
         String streamId = form.getInlongStreamId();
-        String user = context.getApplicant();
+        String applicant = context.getApplicant();
 
         // update inlong group status
-        groupService.updateStatus(groupId, EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode(), user);
+        groupService.updateStatus(groupId, EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode(), applicant);
         // update inlong stream status
-        streamService.updateStatus(groupId, streamId, EntityStatus.STREAM_CONFIG_SUCCESSFUL.getCode(), user);
+        streamService.updateStatus(groupId, streamId, EntityStatus.STREAM_CONFIG_SUCCESSFUL.getCode(), applicant);
         // update file data source status
-        fileDetailMapper.updateStatusAfterApprove(groupId, streamId, EntityStatus.AGENT_ADD.getCode(), user);
+        fileDetailMapper.updateStatusAfterApprove(groupId, streamId, EntityStatus.AGENT_ADD.getCode(), applicant);
+        // Update stream source status
+        sourceService.updateStatus(groupId, streamId, SourceState.TO_BE_ISSUED_ADD.getCode(), applicant);
 
         return ListenerResult.success();
     }