You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/15 09:37:48 UTC
[inlong] 01/02: [INLONG-6525][Manager] Update the status of stream and source after the InlongGroup is completed (#6527)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 4fb36f3299e681052c9057a54d39e2522c901165
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Mon Nov 14 15:14:29 2022 +0800
[INLONG-6525][Manager] Update the status of stream and source after the InlongGroup is completed (#6527)
---
.../listener/group/InitGroupCompleteListener.java | 24 ++++++++++++++--------
1 file changed, 16 insertions(+), 8 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index ca5a375fb..a16562fd8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -18,26 +18,27 @@
package org.apache.inlong.manager.service.listener.group;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.ProcessEvent;
+import org.apache.inlong.manager.common.enums.SourceStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupUtils;
-import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.stream.InlongStreamProcessService;
+import org.apache.inlong.manager.service.source.StreamSourceService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.List;
-
/**
* The listener of InlongGroup when created resources successfully.
*/
@@ -50,7 +51,9 @@ public class InitGroupCompleteListener implements ProcessEventListener {
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
- private InlongStreamProcessService streamProcessService;
+ private InlongStreamService streamService;
+ @Autowired
+ private StreamSourceService sourceService;
@Override
public ProcessEvent event() {
@@ -81,9 +84,14 @@ public class InitGroupCompleteListener implements ProcessEventListener {
updateGroupRequest.setVersion(existGroup.getVersion());
groupService.update(updateGroupRequest, operator);
- List<InlongStreamInfo> streamList = form.getStreamInfos();
- for (InlongStreamInfo streamInfo : streamList) {
- streamProcessService.startProcess(groupId, streamInfo.getInlongStreamId(), operator, false);
+ // update status of other related configs
+ if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) {
+ streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+ } else {
+ sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ }
}
log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);