You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/18 09:22:55 UTC
[inlong] branch master updated: [INLONG-5066][Manager] Remove the LightGroup-related workflow listener and definitions (#5080)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5928f1023 [INLONG-5066][Manager] Remove the LightGroup-related workflow listener and definitions (#5080)
5928f1023 is described below
commit 5928f1023d7489db056a26661cdfaa95bc1eddcf
Author: healchow <he...@gmail.com>
AuthorDate: Mon Jul 18 17:22:49 2022 +0800
[INLONG-5066][Manager] Remove the LightGroup-related workflow listener and definitions (#5080)
---
.../inlong/manager/client/ut/Kafka2HiveTest.java | 20 +-
.../manager/client/api/impl/InlongGroupImpl.java | 15 +-
.../client/api/inner/InnerGroupContext.java | 4 +-
.../client/api/inner/client/WorkflowClient.java | 16 +-
.../client/api/impl/InlongGroupImplTest.java | 14 +-
.../inlong/manager/common/enums/ErrorCodeEnum.java | 7 +-
.../inlong/manager/common/enums/GroupMode.java | 9 +-
...sForm.java => ApplyConsumptionProcessForm.java} | 4 +-
...ProcessForm.java => ApplyGroupProcessForm.java} | 6 +-
.../workflow/form/process/BaseProcessForm.java | 5 +-
.../process/LightGroupResourceProcessForm.java | 11 +
.../operation/ConsumptionProcessOperation.java | 12 +-
.../operation/InlongGroupProcessOperation.java | 241 +++++++++------------
.../manager/service/group/GroupCheckService.java | 20 +-
.../manager/service/workflow/ProcessName.java | 32 +--
...er.java => ApplyConsumptionProcessHandler.java} | 8 +-
...ava => ApplyConsumptionWorkflowDefinition.java} | 29 +--
.../listener/ConsumptionCancelProcessListener.java | 4 +-
.../ConsumptionCompleteProcessListener.java | 4 +-
.../listener/ConsumptionPassTaskListener.java | 4 +-
.../listener/ConsumptionRejectProcessListener.java | 4 +-
...tion.java => ApplyGroupWorkflowDefinition.java} | 39 ++--
.../group/CreateGroupWorkflowDefinition.java | 40 ++--
.../group/DeleteGroupWorkflowDefinition.java | 38 ++--
.../group/RestartGroupWorkflowDefinition.java | 53 +++--
.../group/SuspendGroupWorkflowDefinition.java | 51 +++--
.../light/CreateLightGroupWorkflowDefinition.java | 93 --------
.../light/DeleteLightGroupWorkflowDefinition.java | 92 --------
.../light/RestartLightGroupWorkflowDefinition.java | 92 --------
.../light/SuspendLightGroupWorkflowDefinition.java | 92 --------
.../group/listener/GroupInitProcessListener.java | 63 ------
.../group/listener/GroupUpdateListener.java | 75 -------
...istener.java => InitGroupCompleteListener.java} | 37 ++--
...sListener.java => InitGroupFailedListener.java} | 28 ++-
...oupInitListener.java => InitGroupListener.java} | 38 ++--
...tener.java => UpdateGroupCompleteListener.java} | 41 ++--
...istener.java => UpdateGroupFailedListener.java} | 17 +-
...pleteListener.java => UpdateGroupListener.java} | 36 +--
.../AfterApprovedTaskListener.java} | 27 +--
.../ApproveApplyProcessListener.java} | 52 ++---
.../CancelApplyProcessListener.java} | 32 +--
.../RejectApplyProcessListener.java} | 22 +-
.../listener/light/LightGroupCompleteListener.java | 68 ------
.../listener/light/LightGroupFailedListener.java | 62 ------
.../light/LightGroupUpdateFailedListener.java | 58 -----
.../listener/light/LightGroupUpdateListener.java | 80 -------
.../stream/CreateStreamWorkflowDefinition.java | 40 ++--
.../stream/DeleteStreamWorkflowDefinition.java | 32 +--
.../stream/RestartStreamWorkflowDefinition.java | 30 +--
.../stream/SuspendStreamWorkflowDefinition.java | 30 +--
...stener.java => InitStreamCompleteListener.java} | 4 +-
...Listener.java => InitStreamFailedListener.java} | 4 +-
...rocessListener.java => InitStreamListener.java} | 4 +-
...ener.java => UpdateStreamCompleteListener.java} | 4 +-
...stener.java => UpdateStreamFailedListener.java} | 4 +-
...dateListener.java => UpdateStreamListener.java} | 4 +-
.../inlong/manager/service/ServiceBaseTest.java | 104 ++++++++-
.../core/impl/InlongGroupProcessOperationTest.java | 60 ++---
.../manager/service/sort/DisableZkForSortTest.java | 7 +-
...enerTest.java => StreamSourceListenerTest.java} | 96 ++++----
.../service/workflow/WorkflowServiceImplTest.java | 126 ++---------
.../group/CreateGroupWorkflowDefinitionTest.java | 11 +-
.../main/resources/h2/apache_inlong_manager.sql | 4 +-
.../manager-web/sql/apache_inlong_manager.sql | 4 +-
64 files changed, 789 insertions(+), 1574 deletions(-)
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
index a7d75ac56..20a12a072 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -112,14 +112,14 @@ class Kafka2HiveTest extends BaseTest {
initWorkflowResult.setProcessInfo(
ProcessResponse.builder()
.id(12)
- .name("NEW_GROUP_PROCESS")
- .displayName("New-Group")
- .type("New-Group")
+ .name("APPLY_GROUP_PROCESS")
+ .displayName("Apply-Group")
+ .type("Apply-Group")
.applicant("admin")
.status(PROCESSING)
.startTime(new Date())
.formData(JsonUtils.parseTree(
- "{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\",\"id\":6,"
+ "{\"formName\":\"ApplyGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\",\"id\":6,"
+ "\"inlongGroupId\":\"test_group009\",\"name\":null,\"description\":null,"
+ "\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,"
+ "\"enableCreateResource\":1,\"lightweight\":1,"
@@ -148,8 +148,8 @@ class Kafka2HiveTest extends BaseTest {
.id(12)
.type("UserTask")
.processId(12)
- .processName("NEW_GROUP_PROCESS")
- .processDisplayName("New-Group")
+ .processName("APPLY_GROUP_PROCESS")
+ .processDisplayName("Apply-Group")
.name("ut_admin")
.displayName("SystemAdmin")
.applicant("admin")
@@ -170,14 +170,14 @@ class Kafka2HiveTest extends BaseTest {
startWorkflowResult.setProcessInfo(
ProcessResponse.builder()
.id(12)
- .name("NEW_GROUP_PROCESS")
- .displayName("New-Group")
- .type("New-Group")
+ .name("APPLY_GROUP_PROCESS")
+ .displayName("Apply-Group")
+ .type("Apply-Group")
.applicant("admin")
.status(ProcessStatus.COMPLETED)
.startTime(new Date())
.endTime(new Date())
- .formData("{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\","
+ .formData("{\"formName\":\"ApplyGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\","
+ "\"id\":8,\"inlongGroupId\":\"test_group011\",\"name\":null,\"description\":null,"
+ "\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,\"enableCreateResource\":1,"
+ "\"lightweight\":1,\"inlongClusterTag\":\"default_cluster\","
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 560b7dee8..7ec6f9076 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -47,7 +47,7 @@ import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.springframework.boot.configurationprocessor.json.JSONObject;
@@ -116,7 +116,7 @@ public class InlongGroupImpl implements InlongGroup {
Preconditions.checkTrue(ProcessStatus.PROCESSING == processView.getStatus(),
String.format("process status %s is not corrected, should be PROCESSING", processView.getStatus()));
- // init must be NewGroupProcessForm
+ // init must be ApplyGroupProcessForm
// compile with old cluster
JSONObject formDataJson = JsonUtils.parseObject(
JsonUtils.toJsonString(JsonUtils.toJsonString(processView.getFormData())),
@@ -129,11 +129,12 @@ public class InlongGroupImpl implements InlongGroup {
}
}
String formDataNew = formDataJson.toString();
- NewGroupProcessForm newGroupProcessForm = JsonUtils.parseObject(
- formDataNew, NewGroupProcessForm.class);
- Preconditions.checkNotNull(newGroupProcessForm, "NewGroupProcessForm cannot be null");
- groupContext.setInitMsg(newGroupProcessForm);
- WorkflowResult startWorkflowResult = workFlowClient.startInlongGroup(taskId, newGroupProcessForm);
+ ApplyGroupProcessForm groupProcessForm = JsonUtils.parseObject(
+ formDataNew, ApplyGroupProcessForm.class);
+ Preconditions.checkNotNull(groupProcessForm, "ApplyGroupProcessForm cannot be null");
+ groupContext.setInitMsg(groupProcessForm);
+ assert groupProcessForm != null;
+ WorkflowResult startWorkflowResult = workFlowClient.startInlongGroup(taskId, groupProcessForm);
processView = startWorkflowResult.getProcessInfo();
Preconditions.checkTrue(ProcessStatus.COMPLETED == processView.getStatus(),
String.format("inlong group status %s is incorrect, should be COMPLETED", processView.getStatus()));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
index c67260883..0abbd5e38 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
@@ -23,7 +23,7 @@ import lombok.NoArgsConstructor;
import org.apache.commons.collections.MapUtils;
import org.apache.inlong.manager.client.api.InlongStream;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import java.util.Map;
@@ -41,7 +41,7 @@ public class InnerGroupContext {
private Map<String, InlongStream> streamMap = Maps.newHashMap();
- private NewGroupProcessForm initMsg;
+ private ApplyGroupProcessForm initMsg;
public String getGroupId() {
Preconditions.checkNotNull(groupInfo, "inlong group info was not init");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
index 82994129f..48e44ef0f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.common.util.JsonUtils;
import java.util.List;
@@ -48,18 +48,18 @@ public class WorkflowClient {
workflowApi = ClientUtils.createRetrofit(configuration).create(WorkflowApi.class);
}
- public WorkflowResult startInlongGroup(int taskId, NewGroupProcessForm newGroupProcessForm) {
+ public WorkflowResult startInlongGroup(int taskId, ApplyGroupProcessForm groupProcessForm) {
ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
workflowTaskOperation.put("remark", "approved by system");
- ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
- inlongGroupApproveForm.putPOJO("groupApproveInfo", newGroupProcessForm.getGroupInfo());
- inlongGroupApproveForm.putPOJO("streamApproveInfoList", newGroupProcessForm.getStreamInfoList());
- inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
- workflowTaskOperation.set("form", inlongGroupApproveForm);
+ ObjectNode groupApproveForm = objectMapper.createObjectNode();
+ groupApproveForm.putPOJO("groupApproveInfo", groupProcessForm.getGroupInfo());
+ groupApproveForm.putPOJO("streamApproveInfoList", groupProcessForm.getStreamInfoList());
+ groupApproveForm.put("formName", "InlongGroupApproveForm");
+ workflowTaskOperation.set("form", groupApproveForm);
- log.info("startInlongGroup workflowTaskOperation: {}", inlongGroupApproveForm);
+ log.info("startInlongGroup workflowTaskOperation: {}", groupApproveForm);
Map<String, Object> requestMap = JsonUtils.OBJECT_MAPPER.convertValue(workflowTaskOperation,
new TypeReference<Map<String, Object>>() {
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
index 1cf7ca6c1..ebdf9d1f9 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.client.api.impl;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.junit.jupiter.api.Test;
@@ -31,7 +31,7 @@ class InlongGroupImplTest {
@Test
void testParseForm() {
String json = "{\n"
- + " \"formName\" : \"NewGroupProcessForm\",\n"
+ + " \"formName\" : \"ApplyGroupProcessForm\",\n"
+ " \"groupInfo\" : {\n"
+ " \"mqType\" : \"PULSAR\",\n"
+ " \"id\" : 5,\n"
@@ -88,12 +88,12 @@ class InlongGroupImplTest {
+ " } ]\n"
+ "}";
- NewGroupProcessForm newGroupProcessForm =
- JsonUtils.parseObject(json, NewGroupProcessForm.class);
+ ApplyGroupProcessForm applyGroupProcessForm =
+ JsonUtils.parseObject(json, ApplyGroupProcessForm.class);
- assertNotNull(newGroupProcessForm);
- assertNotNull(newGroupProcessForm.getGroupInfo());
- assertNotNull(newGroupProcessForm.getStreamInfoList());
+ assertNotNull(applyGroupProcessForm);
+ assertNotNull(applyGroupProcessForm.getGroupInfo());
+ assertNotNull(applyGroupProcessForm.getStreamInfoList());
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index b4ae91164..2d9b4a2a1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -34,8 +34,8 @@ public enum ErrorCodeEnum {
GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
GROUP_DUPLICATE(1002, "Inlong group already exists"),
GROUP_INFO_INCORRECT(1003, "Group info was incorrect"),
- GROUP_SAVE_FAILED(1003, "Failed to save/update inlong group information"),
- GROUP_PERMISSION_DENIED(1004, "No access to this inlong group"),
+ GROUP_SAVE_FAILED(1003, "Failed to save/update inlong group"),
+ GROUP_PERMISSION_DENIED(1004, "No permission to access this inlong group"),
GROUP_HAS_STREAM(1005, "There are some valid inlong stream for this inlong group"),
GROUP_UPDATE_NOT_ALLOWED(1006, "The current inlong group status does not support modification"),
GROUP_DELETE_NOT_ALLOWED(1007, "The current inlong group status does not support deletion"),
@@ -46,8 +46,7 @@ public enum ErrorCodeEnum {
GROUP_INFO_INCONSISTENT(1013, "The inlong group info is inconsistent, please contact the administrator"),
GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support light, normal"),
- OPT_NOT_ALLOWED_BY_STATUS(1021,
- "The current inlong group status does not allow adding/modifying/deleting related info"),
+ OPT_NOT_ALLOWED_BY_STATUS(1021, "InlongGroup status %s was not allowed to add/update/delete related info"),
MQ_TYPE_NOT_SUPPORTED(1022, "MQ type '%s' not supported"),
MQ_TYPE_NOT_SAME(1023, "Expected MQ type is '%s', but found '%s'"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
index 43e7606d4..4105def53 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
@@ -27,11 +27,12 @@ import java.util.Objects;
* Mode of inlong group
*/
public enum GroupMode {
+
/**
- * Normal group init with all components in Inlong Cluster
- * StreamSource -> Agent/SDK -> DataProxy -> Cache -> Sort -> StreamSink
+ * Standard group init with all components in Inlong Cluster
+ * StreamSource -> Agent/SDK -> DataProxy -> MQ Cache -> Sort -> StreamSink
*/
- NORMAL("normal"),
+ STANDARD("standard"),
/**
* Lightweight group init with sort in Inlong Cluster
@@ -59,6 +60,6 @@ public enum GroupMode {
if (Objects.equals(groupInfo.getLightweight(), InlongConstants.LIGHTWEIGHT_MODE)) {
return GroupMode.LIGHTWEIGHT;
}
- return GroupMode.NORMAL;
+ return GroupMode.STANDARD;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewConsumptionProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
similarity index 93%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewConsumptionProcessForm.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
index 6d3eb61d7..2587baa63 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewConsumptionProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
@@ -32,9 +32,9 @@ import java.util.Map;
*/
@Data
@EqualsAndHashCode(callSuper = true)
-public class NewConsumptionProcessForm extends BaseProcessForm {
+public class ApplyConsumptionProcessForm extends BaseProcessForm {
- public static final String FORM_NAME = "NewConsumptionProcessForm";
+ public static final String FORM_NAME = "ApplyConsumptionProcessForm";
@ApiModelProperty(value = "Data consumption information")
private ConsumptionInfo consumptionInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewGroupProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyGroupProcessForm.java
similarity index 93%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewGroupProcessForm.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyGroupProcessForm.java
index 0edf76855..17a1645be 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewGroupProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyGroupProcessForm.java
@@ -30,13 +30,13 @@ import java.util.List;
import java.util.Map;
/**
- * New inlong group process form
+ * Apply inlong group process form
*/
@Data
@EqualsAndHashCode(callSuper = false)
-public class NewGroupProcessForm extends BaseProcessForm {
+public class ApplyGroupProcessForm extends BaseProcessForm {
- public static final String FORM_NAME = "NewGroupProcessForm";
+ public static final String FORM_NAME = "ApplyGroupProcessForm";
@ApiModelProperty(value = "Inlong group info", required = true)
private InlongGroupInfo groupInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java
index 48327122f..175b69c36 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java
@@ -28,10 +28,9 @@ import lombok.Data;
@Data
@JsonTypeInfo(use = Id.NAME, property = "formName")
@JsonSubTypes({
- @JsonSubTypes.Type(value = NewGroupProcessForm.class, name = NewGroupProcessForm.FORM_NAME),
- @JsonSubTypes.Type(value = NewConsumptionProcessForm.class, name = NewConsumptionProcessForm.FORM_NAME),
+ @JsonSubTypes.Type(value = ApplyGroupProcessForm.class, name = ApplyGroupProcessForm.FORM_NAME),
+ @JsonSubTypes.Type(value = ApplyConsumptionProcessForm.class, name = ApplyConsumptionProcessForm.FORM_NAME),
@JsonSubTypes.Type(value = GroupResourceProcessForm.class, name = GroupResourceProcessForm.FORM_NAME),
- @JsonSubTypes.Type(value = LightGroupResourceProcessForm.class, name = LightGroupResourceProcessForm.FORM_NAME),
@JsonSubTypes.Type(value = StreamResourceProcessForm.class, name = StreamResourceProcessForm.FORM_NAME),
})
public abstract class BaseProcessForm implements ProcessForm {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
index 634acb078..0aef09b27 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
@@ -26,7 +26,9 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.util.Preconditions;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
/**
* Form of create lightweight inlong group resource
@@ -57,4 +59,13 @@ public class LightGroupResourceProcessForm extends BaseProcessForm {
public String getInlongGroupId() {
return groupInfo.getInlongGroupId();
}
+
+ @Override
+ public Map<String, Object> showInList() {
+ Map<String, Object> show = new HashMap<>();
+ show.put("inlongGroupId", groupInfo.getInlongGroupId());
+ show.put("groupOperateType", this.groupOperateType);
+ return show;
+ }
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
index b1ef3881d..695618bfb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
@@ -52,15 +52,15 @@ public class ConsumptionProcessOperation {
"current status not allow start workflow");
consumptionInfo.setStatus(ConsumptionStatus.WAIT_APPROVE.getStatus());
- boolean isSuccess = consumptionService.update(consumptionInfo,operator);
+ boolean isSuccess = consumptionService.update(consumptionInfo, operator);
Preconditions.checkTrue(isSuccess, "update consumption failed");
- return workflowService.start(ProcessName.NEW_CONSUMPTION_PROCESS, operator,
- genNewConsumptionProcessForm(consumptionInfo));
+ return workflowService.start(ProcessName.APPLY_CONSUMPTION_PROCESS, operator,
+ genConsumptionProcessForm(consumptionInfo));
}
- private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
- NewConsumptionProcessForm form = new NewConsumptionProcessForm();
+ private ApplyConsumptionProcessForm genConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
+ ApplyConsumptionProcessForm form = new ApplyConsumptionProcessForm();
Integer id = consumptionInfo.getId();
MQType mqType = MQType.forType(consumptionInfo.getMqType());
if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
index dd6ac6724..5f8146531 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
@@ -19,19 +19,16 @@ package org.apache.inlong.manager.service.core.operation;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupMode;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.ProcessQuery;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -44,7 +41,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -79,198 +76,185 @@ public class InlongGroupProcessOperation {
private InlongStreamService streamService;
/**
- * Allocate resource application groups for access services and initiate an approval process
+ * Start a New InlongGroup for the specified inlong group id.
*
- * @param groupId Inlong group id
- * @param operator Operator name
- * @return Workflow result
+ * @param groupId inlong group id
+ * @param operator name of operator
+ * @return workflow result
*/
public WorkflowResult startProcess(String groupId, String operator) {
- LOGGER.info("begin to start approve process, groupId = {}, operator = {}", groupId, operator);
+ LOGGER.info("begin to start approve process for groupId={} by operator={}", groupId, operator);
+
groupService.updateStatus(groupId, GroupStatus.TO_BE_APPROVAL.getCode(), operator);
- // Initiate the approval process
- NewGroupProcessForm form = genNewGroupProcessForm(groupId);
- return workflowService.start(ProcessName.NEW_GROUP_PROCESS, operator, form);
+ ApplyGroupProcessForm form = genApplyGroupProcessForm(groupId);
+ WorkflowResult result = workflowService.start(ProcessName.APPLY_GROUP_PROCESS, operator, form);
+
+ LOGGER.info("success to start approve process for groupId={} by operator={}", groupId, operator);
+ return result;
}
/**
- * Suspend resource application group in an asynchronous way,
- * stop source and sort task related to application group asynchronously,
- * persist the application status if necessary.
+ * Suspend InlongGroup in an asynchronous way.
*
- * @return groupId
+ * @param groupId inlong group id
+ * @param operator name of operator
+ * @return inlong group id
+ * @apiNote Stop source and sort task related to the inlong group asynchronously, persist the status if
+ * necessary.
*/
public String suspendProcessAsync(String groupId, String operator) {
- LOGGER.info("begin to suspend process asynchronously, groupId = {}, operator = {}", groupId, operator);
+ LOGGER.info("begin to suspend process asynchronously for groupId={} by operator={}", groupId, operator);
+
groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
InlongGroupInfo groupInfo = groupService.get(groupId);
- GroupMode mode = GroupMode.parseGroupMode(groupInfo);
- switch (mode) {
- case NORMAL:
- GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
- executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
- break;
- case LIGHTWEIGHT:
- LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
- executorService.execute(
- () -> workflowService.start(ProcessName.SUSPEND_LIGHT_GROUP_PROCESS, operator, lightForm));
- break;
- default:
- throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
- }
+ GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND);
+ executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
+
+ LOGGER.info("success to suspend process asynchronously for groupId={} by operator={}", groupId, operator);
return groupId;
}
/**
- * Suspend resource application group which is started up successfully,
- * stop source and sort task related to application group asynchronously,
- * persist the application status if necessary.
+ * Suspend InlongGroup which is started up successfully.
*
- * @return Workflow result
+ * @param groupId inlong group id
+ * @param operator name of operator
+ * @return workflow result
+ * @apiNote Stop source and sort task related to the inlong group asynchronously, persist the status if
+ * necessary.
*/
public WorkflowResult suspendProcess(String groupId, String operator) {
- LOGGER.info("begin to suspend process, groupId = {}, operator = {}", groupId, operator);
+ LOGGER.info("begin to suspend process for groupId={} by operator={}", groupId, operator);
+
groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
InlongGroupInfo groupInfo = groupService.get(groupId);
- GroupMode mode = GroupMode.parseGroupMode(groupInfo);
- WorkflowResult result;
- switch (mode) {
- case NORMAL:
- GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
- result = workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form);
- break;
- case LIGHTWEIGHT:
- LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
- result = workflowService.start(ProcessName.SUSPEND_LIGHT_GROUP_PROCESS, operator, lightForm);
- break;
- default:
- throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
- }
+ GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND);
+ WorkflowResult result = workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form);
+
+ LOGGER.info("success to suspend process for groupId={} by operator={}", groupId, operator);
return result;
}
/**
- * Restart resource application group in an asynchronous way,
- * starting from the last persist snapshot.
+ * Restart InlongGroup in an asynchronous way, starting from the last persist snapshot.
*
- * @return Workflow result
+ * @param groupId inlong group id
+ * @param operator name of operator
+ * @return workflow result
*/
public String restartProcessAsync(String groupId, String operator) {
- LOGGER.info("begin to restart process asynchronously, groupId = {}, operator = {}", groupId, operator);
+ LOGGER.info("begin to restart process asynchronously for groupId={} by operator={}", groupId, operator);
+
groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
InlongGroupInfo groupInfo = groupService.get(groupId);
- GroupMode mode = GroupMode.parseGroupMode(groupInfo);
- switch (mode) {
- case NORMAL:
- GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.RESTART);
- executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
- break;
- case LIGHTWEIGHT:
- LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.RESTART);
- executorService.execute(
- () -> workflowService.start(ProcessName.RESTART_LIGHT_GROUP_PROCESS, operator, lightForm));
- break;
- default:
- throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
- }
+ GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART);
+ executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
+
+ LOGGER.info("success to restart process asynchronously for groupId={} by operator={}", groupId, operator);
return groupId;
}
/**
- * Restart resource application group which is suspended successfully,
- * starting from the last persist snapshot.
+ * Restart InlongGroup which is started up successfully, starting from the last persist snapshot.
*
- * @return Workflow result
+ * @param groupId inlong group id
+ * @param operator name of operator
+ * @return workflow result
*/
public WorkflowResult restartProcess(String groupId, String operator) {
- LOGGER.info("begin to restart process, groupId = {}, operator = {}", groupId, operator);
+ LOGGER.info("begin to restart process for groupId={} by operator={}", groupId, operator);
+
groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
InlongGroupInfo groupInfo = groupService.get(groupId);
- GroupMode mode = GroupMode.parseGroupMode(groupInfo);
- WorkflowResult result;
- switch (mode) {
- case NORMAL:
- GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.RESTART);
- result = workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form);
- break;
- case LIGHTWEIGHT:
- LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.RESTART);
- result = workflowService.start(ProcessName.RESTART_LIGHT_GROUP_PROCESS, operator, lightForm);
- break;
- default:
- throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
- }
+ GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART);
+ WorkflowResult result = workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form);
+
+ LOGGER.info("success to restart process for groupId={} by operator={}", groupId, operator);
return result;
}
/**
- * Delete resource application group logically and delete related resource in an
+ * Delete InlongGroup logically and delete related resource in an asynchronous way.
+ *
+ * @param groupId inlong group id
+ * @param operator name of operator
+ * @return inlong group id
*/
public String deleteProcessAsync(String groupId, String operator) {
- LOGGER.info("begin to delete process asynchronously, groupId = {}, operator = {}", groupId, operator);
+ LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator);
executorService.execute(() -> {
try {
invokeDeleteProcess(groupId, operator);
} catch (Exception ex) {
- LOGGER.error("exception while delete process, groupId = {}, operator = {}", groupId, operator, ex);
+ LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
throw ex;
}
groupService.delete(groupId, operator);
});
+
+ LOGGER.info("success to delete process asynchronously for groupId={} by operator={}", groupId, operator);
return groupId;
}
/**
- * Delete resource application group logically and delete related resource in an asynchronous way
+ * Delete InlongGroup logically and delete related resource in an asynchronous way.
*/
public boolean deleteProcess(String groupId, String operator) {
- LOGGER.info("begin to delete process, groupId = {}, operator = {}", groupId, operator);
+ LOGGER.info("begin to delete process for groupId={} by operator={}", groupId, operator);
try {
invokeDeleteProcess(groupId, operator);
} catch (Exception ex) {
- LOGGER.error("exception while delete process, groupId = {}, operator = {}", groupId, operator, ex);
+ LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
throw ex;
}
- return groupService.delete(groupId, operator);
+
+ boolean result = groupService.delete(groupId, operator);
+ LOGGER.info("success to delete process for groupId={} by operator={}", groupId, operator);
+ return result;
}
/**
- * Reset group status when group is staying CONFIG_ING|SUSPENDING|RESTARTING|DELETING for a long time
+ * Reset InlongGroup status when group is staying CONFIG_ING|SUSPENDING|RESTARTING|DELETING for a long time.
* This api is side effect, must be used carefully.
*
- * @param request
- * @param operator
- * @return
+ * @param request reset inlong group request
+ * @param operator name of operator
+ * @return success or false
*/
public boolean resetGroupStatus(InlongGroupResetRequest request, String operator) {
- LOGGER.info("begin to reset group status, request = {}, operator = {}", request, operator);
+ LOGGER.info("begin to reset group status by operator={} for request={}", operator, request);
final String groupId = request.getInlongGroupId();
InlongGroupInfo groupInfo = groupService.get(groupId);
Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+
GroupStatus status = GroupStatus.forCode(groupInfo.getStatus());
- final int rerunProcess = request.getRerunProcess();
- final int resetFinalStatus = request.getResetFinalStatus();
+ boolean result;
switch (status) {
case CONFIG_ING:
case SUSPENDING:
case RESTARTING:
case DELETING:
- return dealWithPendingGroup(groupInfo, operator, status, rerunProcess, resetFinalStatus);
+ final int rerunProcess = request.getRerunProcess();
+ final int resetFinalStatus = request.getResetFinalStatus();
+ result = pendingGroupOpt(groupInfo, operator, status, rerunProcess, resetFinalStatus);
+ break;
default:
- throw new IllegalStateException(
- String.format("Unsupported status to reset for group = %s and status = %s",
- request.getInlongGroupId(), status));
+ throw new IllegalStateException(String.format("Unsupported status to reset groupId=%s and status=%s",
+ request.getInlongGroupId(), status));
}
+
+ LOGGER.info("finish to reset group status by operator={}, result={} for request={}", operator, result, request);
+ return result;
}
- private boolean dealWithPendingGroup(InlongGroupInfo groupInfo, String operator, GroupStatus status,
+ private boolean pendingGroupOpt(InlongGroupInfo groupInfo, String operator, GroupStatus status,
int rerunProcess, int resetFinalStatus) {
final String groupId = groupInfo.getInlongGroupId();
if (rerunProcess == 1) {
ProcessQuery processQuery = new ProcessQuery();
processQuery.setInlongGroupId(groupId);
List<WorkflowProcessEntity> entities = workflowQueryService.listProcessEntity(processQuery);
- Collections.sort(entities, (process1, process2) -> process1.getId() - process2.getId());
+ entities.sort(Comparator.comparingInt(WorkflowProcessEntity::getId));
WorkflowProcessEntity lastProcess = entities.get(entities.size() - 1);
executorService.execute(() -> {
workflowService.continueProcess(lastProcess.getId(), operator, "Reset group status");
@@ -300,27 +284,15 @@ public class InlongGroupProcessOperation {
private void invokeDeleteProcess(String groupId, String operator) {
InlongGroupInfo groupInfo = groupService.get(groupId);
- GroupMode mode = GroupMode.parseGroupMode(groupInfo);
- switch (mode) {
- case NORMAL:
- GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.DELETE);
- workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
- break;
- case LIGHTWEIGHT:
- LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo,
- GroupOperateType.DELETE);
- workflowService.start(ProcessName.DELETE_LIGHT_GROUP_PROCESS, operator, lightForm);
- break;
- default:
- throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
- }
+ GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.DELETE);
+ workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
}
/**
- * Generate the form of [New Group Workflow]
+ * Generate the form of [Apply Group Workflow]
*/
- private NewGroupProcessForm genNewGroupProcessForm(String groupId) {
- NewGroupProcessForm form = new NewGroupProcessForm();
+ private ApplyGroupProcessForm genApplyGroupProcessForm(String groupId) {
+ ApplyGroupProcessForm form = new ApplyGroupProcessForm();
InlongGroupInfo groupInfo = groupService.get(groupId);
form.setGroupInfo(groupInfo);
List<InlongStreamBriefInfo> infoList = streamService.getBriefList(groupInfo.getInlongGroupId());
@@ -328,25 +300,16 @@ public class InlongGroupProcessOperation {
return form;
}
- private GroupResourceProcessForm genGroupProcessForm(InlongGroupInfo groupInfo, GroupOperateType operateType) {
- GroupResourceProcessForm form = new GroupResourceProcessForm();
- String groupId = groupInfo.getInlongGroupId();
- if (GroupOperateType.RESTART == operateType) {
- List<InlongStreamInfo> streamList = streamService.list(groupId);
- form.setStreamInfos(streamList);
- }
- form.setGroupInfo(groupInfo);
- form.setGroupOperateType(operateType);
- return form;
- }
-
- private LightGroupResourceProcessForm genLightGroupProcessForm(InlongGroupInfo groupInfo,
+ /**
+ * Generate the form of [Group Resource Workflow]
+ */
+ private GroupResourceProcessForm genGroupResourceProcessForm(InlongGroupInfo groupInfo,
GroupOperateType operateType) {
- LightGroupResourceProcessForm form = new LightGroupResourceProcessForm();
- form.setGroupInfo(groupInfo);
+ GroupResourceProcessForm form = new GroupResourceProcessForm();
String groupId = groupInfo.getInlongGroupId();
List<InlongStreamInfo> streamList = streamService.list(groupId);
form.setStreamInfos(streamList);
+ form.setGroupInfo(groupInfo);
form.setGroupOperateType(operateType);
return form;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
index c3062c047..f87f9ecc2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
@@ -23,8 +23,6 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -37,30 +35,28 @@ import java.util.List;
@Service
public class GroupCheckService {
- private static final Logger LOGGER = LoggerFactory.getLogger(GroupCheckService.class);
-
@Autowired
private InlongGroupEntityMapper groupMapper;
/**
* Check whether the inlong group status is temporary
*
- * @param groupId Inlong group id
- * @return Inlong group entity, for caller reuse
+ * @param groupId inlong group id
+ * @return inlong group entity, for caller reuse
*/
public InlongGroupEntity checkGroupStatus(String groupId, String operator) {
InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
- Preconditions.checkNotNull(inlongGroupEntity, "groupId is invalid");
+ if (inlongGroupEntity == null) {
+ throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId));
+ }
List<String> managers = Arrays.asList(inlongGroupEntity.getInCharges().split(","));
Preconditions.checkTrue(managers.contains(operator),
String.format(ErrorCodeEnum.USER_IS_NOT_MANAGER.getMessage(), operator, managers));
- GroupStatus state = GroupStatus.forCode(inlongGroupEntity.getStatus());
- // Add/modify/delete is not allowed under certain group state
- if (GroupStatus.notAllowedUpdate(state)) {
- LOGGER.error("inlong group status was not allowed to add/update/delete related info");
- throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS);
+ GroupStatus status = GroupStatus.forCode(inlongGroupEntity.getStatus());
+ if (GroupStatus.notAllowedUpdate(status)) {
+ throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
}
return inlongGroupEntity;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
index aacfbba1c..176c6ae0c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
@@ -23,72 +23,72 @@ package org.apache.inlong.manager.service.workflow;
public enum ProcessName {
/**
- * New inlong group application process
+ * Apply inlong group process
*/
- NEW_GROUP_PROCESS("New-Group"),
+ APPLY_GROUP_PROCESS("Apply-Group"),
/**
- * Startup inlong group application process
+ * Create inlong group resources process
*/
CREATE_GROUP_RESOURCE("Create-Group"),
/**
- * Suspend inlong group application process
+ * Suspend inlong group process
*/
SUSPEND_GROUP_PROCESS("Suspend-Group"),
/**
- * Restart inlong group application process
+ * Restart inlong group process
*/
RESTART_GROUP_PROCESS("Restart-Group"),
/**
- * Delete inlong group application process
+ * Delete inlong group process
*/
DELETE_GROUP_PROCESS("Delete-Group"),
/**
- * Startup lightweight inlong group application process
+ * Startup lightweight inlong group process
*/
CREATE_LIGHT_GROUP_PROCESS("Create-Light-Group"),
/**
- * Suspend lightweight inlong group application process
+ * Suspend lightweight inlong group process
*/
SUSPEND_LIGHT_GROUP_PROCESS("Suspend-Light-Group"),
/**
- * Restart lightweight inlong group application process
+ * Restart lightweight inlong group process
*/
RESTART_LIGHT_GROUP_PROCESS("Restart-Light-Group"),
/**
- * Delete lightweight inlong group application process
+ * Delete lightweight inlong group process
*/
DELETE_LIGHT_GROUP_PROCESS("Delete-Light-Group"),
/**
- * New consumption application process
+ * Apply consumption process
*/
- NEW_CONSUMPTION_PROCESS("New-Consumption"),
+ APPLY_CONSUMPTION_PROCESS("Apply-Consumption"),
/**
- * Startup single stream process
+ * Create inlong stream process
*/
CREATE_STREAM_RESOURCE("Create-Stream"),
/**
- * Suspend single stream process
+ * Suspend inlong stream process
*/
SUSPEND_STREAM_RESOURCE("Suspend-Stream"),
/**
- * Restart single stream process
+ * Restart inlong stream process
*/
RESTART_STREAM_RESOURCE("Restart-Stream"),
/**
- * Delete single stream process
+ * Delete inlong stream process
*/
DELETE_STREAM_RESOURCE("Delete-Stream");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
index 6e60e5a55..8e1f77046 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.consumption;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.common.pojo.workflow.ProcessDetailResponse;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
import org.apache.inlong.manager.workflow.definition.ProcessDetailHandler;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
@@ -28,10 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * New consumption process detail handler
+ * Apply consumption process handler
*/
@Component
-public class NewConsumptionProcessDetailHandler implements ProcessDetailHandler {
+public class ApplyConsumptionProcessHandler implements ProcessDetailHandler {
@Autowired
private ObjectMapper objectMapper;
@@ -41,7 +41,7 @@ public class NewConsumptionProcessDetailHandler implements ProcessDetailHandler
@Override
public ProcessDetailResponse handle(ProcessDetailResponse processResponse) {
WorkflowProcess process = processDefinitionService.getByName(processResponse.getWorkflow().getName());
- NewConsumptionProcessForm processForm = WorkflowFormParserUtils
+ ApplyConsumptionProcessForm processForm = WorkflowFormParserUtils
.parseProcessForm(objectMapper, processResponse.getProcessInfo().getFormData().toString(), process);
if (processForm == null) {
return processResponse;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
similarity index 88%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
index 94cd312ef..9e64f2886 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
@@ -17,15 +17,12 @@
package org.apache.inlong.manager.service.workflow.consumption;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCancelProcessListener;
@@ -40,11 +37,15 @@ import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
/**
* New data consumption workflow definition
*/
@Component
-public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
+public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
public static final String UT_ADMINT_NAME = "ut_admin";
public static final String UT_GROUP_OWNER_NAME = "ut_biz_owner";
@@ -65,7 +66,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
private WorkflowApproverService workflowApproverService;
@Autowired
- private NewConsumptionProcessDetailHandler newConsumptionProcessDetailHandler;
+ private ApplyConsumptionProcessHandler applyConsumptionProcessHandler;
@Autowired
private InlongGroupService groupService;
@@ -74,12 +75,12 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
public WorkflowProcess defineProcess() {
// Define process information
WorkflowProcess process = new WorkflowProcess();
- process.setType("Data Consumption Resource Creation");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(NewConsumptionProcessForm.class);
+ process.setFormClass(ApplyConsumptionProcessForm.class);
process.setVersion(1);
- process.setProcessDetailHandler(newConsumptionProcessDetailHandler);
+ process.setProcessDetailHandler(applyConsumptionProcessHandler);
// Start node
StartEvent startEvent = new StartEvent();
@@ -93,7 +94,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
UserTask groupOwnerUserTask = new UserTask();
groupOwnerUserTask.setName(UT_GROUP_OWNER_NAME);
groupOwnerUserTask.setDisplayName("Group Approval");
- groupOwnerUserTask.setApproverAssign(this::bizOwnerUserTaskApprover);
+ groupOwnerUserTask.setApproverAssign(this::groupOwnerUserTaskApprover);
process.addTask(groupOwnerUserTask);
// System administrator approval
@@ -123,8 +124,8 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
new WorkflowApproverFilterContext());
}
- private List<String> bizOwnerUserTaskApprover(WorkflowContext context) {
- NewConsumptionProcessForm form = (NewConsumptionProcessForm) context.getProcessForm();
+ private List<String> groupOwnerUserTaskApprover(WorkflowContext context) {
+ ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
InlongGroupInfo groupInfo = groupService.get(form.getConsumptionInfo().getInlongGroupId());
if (groupInfo == null || groupInfo.getInCharges() == null) {
return Collections.emptyList();
@@ -135,7 +136,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
@Override
public ProcessName getProcessName() {
- return ProcessName.NEW_CONSUMPTION_PROCESS;
+ return ProcessName.APPLY_CONSUMPTION_PROCESS;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
index d678968a4..c01964e98 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
@@ -20,9 +20,9 @@ package org.apache.inlong.manager.service.workflow.consumption.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -53,7 +53,7 @@ public class ConsumptionCancelProcessListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewConsumptionProcessForm processForm = (NewConsumptionProcessForm) context.getProcessForm();
+ ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
ConsumptionEntity update = new ConsumptionEntity();
update.setId(processForm.getConsumptionInfo().getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 43eb82ac1..6ad4ea7c5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -75,7 +75,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewConsumptionProcessForm consumptionForm = (NewConsumptionProcessForm) context.getProcessForm();
+ ApplyConsumptionProcessForm consumptionForm = (ApplyConsumptionProcessForm) context.getProcessForm();
// Real-time query of consumption information
Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
index d2c27421f..c50bafd26 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -50,7 +50,7 @@ public class ConsumptionPassTaskListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewConsumptionProcessForm form = (NewConsumptionProcessForm) context.getProcessForm();
+ ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
ConsumptionApproveForm approveForm = (ConsumptionApproveForm) context.getActionContext().getForm();
ConsumptionInfo info = form.getConsumptionInfo();
if (StringUtils.equals(approveForm.getConsumerGroup(), info.getConsumerGroup())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
index a9f9a16db..09ba29cc9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
@@ -21,7 +21,7 @@ import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -51,7 +51,7 @@ public class ConsumptionRejectProcessListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewConsumptionProcessForm processForm = (NewConsumptionProcessForm) context.getProcessForm();
+ ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
ConsumptionEntity update = new ConsumptionEntity();
update.setId(processForm.getConsumptionInfo().getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
similarity index 81%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
index 97b77d943..fc5990d04 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
@@ -18,15 +18,15 @@
package org.apache.inlong.manager.service.workflow.group;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.task.InlongGroupApproveForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupCancelProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.approve.GroupAfterApprovedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.approve.GroupRejectProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.approve.GroupApproveProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.AfterApprovedTaskListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.ApproveApplyProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.CancelApplyProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.RejectApplyProcessListener;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.StartEvent;
import org.apache.inlong.manager.workflow.definition.UserTask;
@@ -40,16 +40,16 @@ import java.util.List;
* New inlong group process definition
*/
@Component
-public class NewGroupWorkflowDefinition implements WorkflowDefinition {
+public class ApplyGroupWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private GroupAfterApprovedListener groupAfterApprovedListener;
+ private CancelApplyProcessListener cancelApplyProcessListener;
@Autowired
- private GroupCancelProcessListener groupCancelProcessListener;
+ private RejectApplyProcessListener rejectApplyProcessListener;
@Autowired
- private GroupRejectProcessListener approveRejectProcessListener;
+ private ApproveApplyProcessListener approveApplyProcessListener;
@Autowired
- private GroupApproveProcessListener groupApproveProcessListener;
+ private AfterApprovedTaskListener afterApprovedTaskListener;
@Autowired
private WorkflowApproverService workflowApproverService;
@@ -57,18 +57,18 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
public WorkflowProcess defineProcess() {
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.setType(getProcessName().getDisplayName());
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(NewGroupProcessForm.class);
+ process.setFormClass(ApplyGroupProcessForm.class);
process.setVersion(1);
// Set up the listener
- process.addListener(groupCancelProcessListener);
- process.addListener(approveRejectProcessListener);
+ process.addListener(cancelApplyProcessListener);
+ process.addListener(rejectApplyProcessListener);
// Initiate the process of creating inlong group resources, and set the inlong group status
// to [Configuration Successful]/[Configuration Failed] according to its completion
- process.addListener(groupApproveProcessListener);
+ process.addListener(approveApplyProcessListener);
// Start node
StartEvent startEvent = new StartEvent();
@@ -84,12 +84,13 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
adminUserTask.setDisplayName("SystemAdmin");
adminUserTask.setFormClass(InlongGroupApproveForm.class);
adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
- adminUserTask.addListener(groupAfterApprovedListener);
+ adminUserTask.addListener(afterApprovedTaskListener);
process.addTask(adminUserTask);
- // Configuration order relation
- startEvent.addNext(adminUserTask);
// If you need another approval process, you can add it here
+
+ // Configuration the tasks order
+ startEvent.addNext(adminUserTask);
adminUserTask.addNext(endEvent);
return process;
@@ -97,7 +98,7 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
@Override
public ProcessName getProcessName() {
- return ProcessName.NEW_GROUP_PROCESS;
+ return ProcessName.APPLY_GROUP_PROCESS;
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 94182b69a..862c1f99e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -21,9 +21,9 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupFailedProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupInitProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.InitGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.InitGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.InitGroupListener;
import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
@@ -41,61 +41,61 @@ import org.springframework.stereotype.Component;
public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private GroupInitProcessListener groupInitProcessListener;
+ private InitGroupListener initGroupListener;
@Autowired
- private GroupCompleteProcessListener groupCompleteProcessListener;
+ private InitGroupCompleteListener initGroupCompleteListener;
@Autowired
- private GroupFailedProcessListener groupFailedProcessListener;
+ private InitGroupFailedListener initGroupFailedListener;
@Autowired
private GroupTaskListenerFactory groupTaskListenerFactory;
@Override
public WorkflowProcess defineProcess() {
-
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(groupInitProcessListener);
- process.addListener(groupCompleteProcessListener);
- process.addListener(groupFailedProcessListener);
-
- process.setType("Group Resource Creation");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(GroupResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(initGroupListener);
+ process.addListener(initGroupCompleteListener);
+ process.addListener(initGroupFailedListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- // init MQ
+ // Init MQ
ServiceTask initMQTask = new ServiceTask();
- initMQTask.setName("initMQ");
+ initMQTask.setName("InitMQ");
initMQTask.setDisplayName("Group-InitMQ");
initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
initMQTask.addListenerProvider(groupTaskListenerFactory);
process.addTask(initMQTask);
- // init Sink
+ // Init Sink
ServiceTask initSinkTask = new ServiceTask();
- initSinkTask.setName("initSink");
+ initSinkTask.setName("InitSink");
initSinkTask.setDisplayName("Group-InitSink");
initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
initSinkTask.addListenerProvider(groupTaskListenerFactory);
process.addTask(initSinkTask);
- // init Sort
+ // Init Sort
ServiceTask initSortTask = new ServiceTask();
- initSortTask.setName("initSort");
+ initSortTask.setName("InitSort");
initSortTask.setDisplayName("Group-InitSort");
initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
initSortTask.addListenerProvider(groupTaskListenerFactory);
process.addTask(initSortTask);
- // init Source
+ // Init Source
ServiceTask initSourceTask = new ServiceTask();
- initSourceTask.setName("initSource");
+ initSourceTask.setName("InitSource");
initSourceTask.setDisplayName("Group-InitSource");
initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
initSourceTask.addListenerProvider(groupTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 3d0f735df..8f813f6f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -19,12 +19,12 @@ package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupListener;
+import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -34,18 +34,18 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Delete workflow definitions for inlong group
+ * Delete workflow definition for inlong group
*/
@Slf4j
@Component
public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private GroupUpdateListener groupUpdateListener;
+ private UpdateGroupListener updateGroupListener;
@Autowired
- private GroupUpdateCompleteListener groupUpdateCompleteListener;
+ private UpdateGroupCompleteListener updateGroupCompleteListener;
@Autowired
- private GroupUpdateFailedListener groupUpdateFailedListener;
+ private UpdateGroupFailedListener updateGroupFailedListener;
@Autowired
private GroupTaskListenerFactory groupTaskListenerFactory;
@@ -53,39 +53,41 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
public WorkflowProcess defineProcess() {
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(groupUpdateListener);
- process.addListener(groupUpdateCompleteListener);
- process.addListener(groupUpdateFailedListener);
- process.setType("Group Resource Delete");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(GroupResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(updateGroupListener);
+ process.addListener(updateGroupCompleteListener);
+ process.addListener(updateGroupFailedListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //delete datasource
+ // Delete Source
ServiceTask deleteDataSourceTask = new ServiceTask();
- deleteDataSourceTask.setName("deleteSource");
+ deleteDataSourceTask.setName("DeleteSource");
deleteDataSourceTask.setDisplayName("Group-DeleteSource");
deleteDataSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
deleteDataSourceTask.addListenerProvider(groupTaskListenerFactory);
process.addTask(deleteDataSourceTask);
- //delete MQ
+ // Delete MQ
ServiceTask deleteMqTask = new ServiceTask();
- deleteMqTask.setName("deleteMQ");
+ deleteMqTask.setName("DeleteMQ");
deleteMqTask.setDisplayName("Group-DeleteMQ");
deleteMqTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
deleteMqTask.addListenerProvider(groupTaskListenerFactory);
process.addTask(deleteMqTask);
- //delete sort
+ // Delete Sort
ServiceTask deleteSortTask = new ServiceTask();
- deleteSortTask.setName("deleteSort");
+ deleteSortTask.setName("DeleteSort");
deleteSortTask.setDisplayName("Group-DeleteSort");
deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
deleteSortTask.addListenerProvider(groupTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
index e79af1a5b..ed9f22413 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
@@ -20,11 +20,11 @@ package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupListener;
+import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,58 +41,57 @@ import org.springframework.stereotype.Component;
public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private GroupUpdateListener groupUpdateListener;
-
+ private UpdateGroupListener updateGroupListener;
@Autowired
- private GroupUpdateCompleteListener groupUpdateCompleteListener;
-
+ private UpdateGroupCompleteListener updateGroupCompleteListener;
@Autowired
- private GroupUpdateFailedListener groupUpdateFailedListener;
-
+ private UpdateGroupFailedListener updateGroupFailedListener;
@Autowired
- private GroupTaskListenerFactory groupTaskListenerFactory;
+ private GroupTaskListenerFactory taskListenerFactory;
@Override
public WorkflowProcess defineProcess() {
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(groupUpdateListener);
- process.addListener(groupUpdateCompleteListener);
- process.addListener(groupUpdateFailedListener);
- process.setType("Group Resource Restart");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(GroupResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(updateGroupListener);
+ process.addListener(updateGroupCompleteListener);
+ process.addListener(updateGroupFailedListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //restart sort
+ // Restart Sort
ServiceTask restartSortTask = new ServiceTask();
- restartSortTask.setName("restartSort");
+ restartSortTask.setName("RestartSort");
restartSortTask.setDisplayName("Group-RestartSort");
restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
- restartSortTask.addListenerProvider(groupTaskListenerFactory);
+ restartSortTask.addListenerProvider(taskListenerFactory);
process.addTask(restartSortTask);
- //restart datasource
- ServiceTask restartDataSourceTask = new ServiceTask();
- restartDataSourceTask.setName("restartSource");
- restartDataSourceTask.setDisplayName("Group-RestartSource");
- restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
- restartDataSourceTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(restartDataSourceTask);
+ // Restart Source
+ ServiceTask restartSourceTask = new ServiceTask();
+ restartSourceTask.setName("RestartSource");
+ restartSourceTask.setDisplayName("Group-RestartSource");
+ restartSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+ restartSourceTask.addListenerProvider(taskListenerFactory);
+ process.addTask(restartSourceTask);
// End node
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
startEvent.addNext(restartSortTask);
- restartSortTask.addNext(restartDataSourceTask);
- restartDataSourceTask.addNext(endEvent);
+ restartSortTask.addNext(restartSourceTask);
+ restartSourceTask.addNext(endEvent);
return process;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
index 94926298a..aabba5bf3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
@@ -20,11 +20,11 @@ package org.apache.inlong.manager.service.workflow.group;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupListener;
+import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -34,21 +34,18 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Suspend inlong group process definition
+ * Suspend workflow definition for inlong group
*/
@Slf4j
@Component
public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private GroupUpdateListener groupUpdateListener;
-
+ private UpdateGroupListener updateGroupListener;
@Autowired
- private GroupUpdateCompleteListener groupUpdateCompleteListener;
-
+ private UpdateGroupCompleteListener updateGroupCompleteListener;
@Autowired
- private GroupUpdateFailedListener groupUpdateFailedListener;
-
+ private UpdateGroupFailedListener updateGroupFailedListener;
@Autowired
private GroupTaskListenerFactory groupTaskListenerFactory;
@@ -56,31 +53,33 @@ public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
public WorkflowProcess defineProcess() {
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(groupUpdateListener);
- process.addListener(groupUpdateCompleteListener);
- process.addListener(groupUpdateFailedListener);
- process.setType("Group Resource Suspend");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(GroupResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(updateGroupListener);
+ process.addListener(updateGroupCompleteListener);
+ process.addListener(updateGroupFailedListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //stop datasource
- ServiceTask stopDataSourceTask = new ServiceTask();
- stopDataSourceTask.setName("stopSource");
- stopDataSourceTask.setDisplayName("Group-StopSource");
- stopDataSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
- stopDataSourceTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(stopDataSourceTask);
+ // Stop Source
+ ServiceTask stopSourceTask = new ServiceTask();
+ stopSourceTask.setName("StopSource");
+ stopSourceTask.setDisplayName("Group-StopSource");
+ stopSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
+ stopSourceTask.addListenerProvider(groupTaskListenerFactory);
+ process.addTask(stopSourceTask);
- //stop sort
+ // Stop Sort
ServiceTask stopSortTask = new ServiceTask();
- stopSortTask.setName("stopSort");
+ stopSortTask.setName("StopSort");
stopSortTask.setDisplayName("Group-StopSort");
stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
stopSortTask.addListenerProvider(groupTaskListenerFactory);
@@ -90,8 +89,8 @@ public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
- startEvent.addNext(stopDataSourceTask);
- stopDataSourceTask.addNext(stopSortTask);
+ startEvent.addNext(stopSourceTask);
+ stopSourceTask.addNext(stopSortTask);
stopSortTask.addNext(endEvent);
return process;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/CreateLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/CreateLightGroupWorkflowDefinition.java
deleted file mode 100644
index 82a24260e..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/CreateLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupInitListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Create light inlong group definition
- */
-@Slf4j
-@Component
-public class CreateLightGroupWorkflowDefinition implements WorkflowDefinition {
-
- @Autowired
- private LightGroupInitListener lightGroupInitListener;
- @Autowired
- private LightGroupCompleteListener lightGroupCompleteListener;
- @Autowired
- private LightGroupFailedListener lightGroupFailedListener;
- @Autowired
- private GroupTaskListenerFactory groupTaskListenerFactory;
-
- @Override
- public WorkflowProcess defineProcess() {
- // Configuration process
- WorkflowProcess process = new WorkflowProcess();
- process.addListener(lightGroupInitListener);
- process.addListener(lightGroupCompleteListener);
- process.addListener(lightGroupFailedListener);
-
- process.setType("Group Resource Creation");
- process.setName(getProcessName().name());
- process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(LightGroupResourceProcessForm.class);
- process.setVersion(1);
- process.setHidden(1);
-
- // Start node
- StartEvent startEvent = new StartEvent();
- process.setStartEvent(startEvent);
-
- // init Sort resource
- ServiceTask initSortResourceTask = new ServiceTask();
- initSortResourceTask.setName("initSort");
- initSortResourceTask.setDisplayName("Group-InitSort");
- initSortResourceTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
- initSortResourceTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(initSortResourceTask);
-
- // End node
- EndEvent endEvent = new EndEvent();
- process.setEndEvent(endEvent);
-
- startEvent.addNext(initSortResourceTask);
- initSortResourceTask.addNext(endEvent);
-
- return process;
- }
-
- @Override
- public ProcessName getProcessName() {
- return ProcessName.CREATE_LIGHT_GROUP_PROCESS;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/DeleteLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/DeleteLightGroupWorkflowDefinition.java
deleted file mode 100644
index 915f351ca..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/DeleteLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Delete light workflow definition for inlong group
- */
-@Slf4j
-@Component
-public class DeleteLightGroupWorkflowDefinition implements WorkflowDefinition {
-
- @Autowired
- private LightGroupUpdateListener lightGroupUpdateListener;
- @Autowired
- private LightGroupUpdateCompleteListener lightGroupUpdateCompleteListener;
- @Autowired
- private LightGroupUpdateFailedListener lightGroupUpdateFailedListener;
- @Autowired
- private GroupTaskListenerFactory groupTaskListenerFactory;
-
- @Override
- public WorkflowProcess defineProcess() {
- // Configuration process
- WorkflowProcess process = new WorkflowProcess();
- process.addListener(lightGroupUpdateListener);
- process.addListener(lightGroupUpdateCompleteListener);
- process.addListener(lightGroupUpdateFailedListener);
- process.setType("Group Resource Delete");
- process.setName(getProcessName().name());
- process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(LightGroupResourceProcessForm.class);
- process.setVersion(1);
- process.setHidden(1);
-
- // Start node
- StartEvent startEvent = new StartEvent();
- process.setStartEvent(startEvent);
-
- //delete sort
- ServiceTask deleteSortTask = new ServiceTask();
- deleteSortTask.setName("deleteSort");
- deleteSortTask.setDisplayName("Group-DeleteSort");
- deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
- deleteSortTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(deleteSortTask);
-
- // End node
- EndEvent endEvent = new EndEvent();
- process.setEndEvent(endEvent);
-
- startEvent.addNext(deleteSortTask);
- deleteSortTask.addNext(endEvent);
-
- return process;
- }
-
- @Override
- public ProcessName getProcessName() {
- return ProcessName.DELETE_LIGHT_GROUP_PROCESS;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/RestartLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/RestartLightGroupWorkflowDefinition.java
deleted file mode 100644
index cd27b1719..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/RestartLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Restart light inlong group process definition
- */
-@Slf4j
-@Component
-public class RestartLightGroupWorkflowDefinition implements WorkflowDefinition {
-
- @Autowired
- private LightGroupUpdateListener lightGroupUpdateListener;
- @Autowired
- private LightGroupUpdateCompleteListener lightGroupUpdateCompleteListener;
- @Autowired
- private LightGroupUpdateFailedListener lightGroupUpdateFailedListener;
- @Autowired
- private GroupTaskListenerFactory groupTaskListenerFactory;
-
- @Override
- public WorkflowProcess defineProcess() {
- // Configuration process
- WorkflowProcess process = new WorkflowProcess();
- process.addListener(lightGroupUpdateListener);
- process.addListener(lightGroupUpdateCompleteListener);
- process.addListener(lightGroupUpdateFailedListener);
- process.setType("Group Resource Restart");
- process.setName(getProcessName().name());
- process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(LightGroupResourceProcessForm.class);
- process.setVersion(1);
- process.setHidden(1);
-
- // Start node
- StartEvent startEvent = new StartEvent();
- process.setStartEvent(startEvent);
-
- //restart sort
- ServiceTask restartSortTask = new ServiceTask();
- restartSortTask.setName("restartSort");
- restartSortTask.setDisplayName("Group-RestartSort");
- restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
- restartSortTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(restartSortTask);
-
- // End node
- EndEvent endEvent = new EndEvent();
- process.setEndEvent(endEvent);
-
- startEvent.addNext(restartSortTask);
- restartSortTask.addNext(endEvent);
-
- return process;
- }
-
- @Override
- public ProcessName getProcessName() {
- return ProcessName.RESTART_LIGHT_GROUP_PROCESS;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/SuspendLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/SuspendLightGroupWorkflowDefinition.java
deleted file mode 100644
index 14003dfad..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/SuspendLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Suspend light workflow definition for inlong group
- */
-@Slf4j
-@Component
-public class SuspendLightGroupWorkflowDefinition implements WorkflowDefinition {
-
- @Autowired
- private LightGroupUpdateListener lightGroupUpdateListener;
- @Autowired
- private LightGroupUpdateCompleteListener lightGroupUpdateCompleteListener;
- @Autowired
- private LightGroupUpdateFailedListener lightGroupUpdateFailedListener;
- @Autowired
- private GroupTaskListenerFactory groupTaskListenerFactory;
-
- @Override
- public WorkflowProcess defineProcess() {
- // Configuration process
- WorkflowProcess process = new WorkflowProcess();
- process.addListener(lightGroupUpdateListener);
- process.addListener(lightGroupUpdateCompleteListener);
- process.addListener(lightGroupUpdateFailedListener);
- process.setType("Group Resource Suspend");
- process.setName(getProcessName().name());
- process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(LightGroupResourceProcessForm.class);
- process.setVersion(1);
- process.setHidden(1);
-
- // Start node
- StartEvent startEvent = new StartEvent();
- process.setStartEvent(startEvent);
-
- //stop sort
- ServiceTask stopSortTask = new ServiceTask();
- stopSortTask.setName("stopSort");
- stopSortTask.setDisplayName("Group-StopSort");
- stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
- stopSortTask.addListenerProvider(groupTaskListenerFactory);
- process.addTask(stopSortTask);
-
- // End node
- EndEvent endEvent = new EndEvent();
- process.setEndEvent(endEvent);
-
- startEvent.addNext(stopSortTask);
- stopSortTask.addNext(endEvent);
-
- return process;
- }
-
- @Override
- public ProcessName getProcessName() {
- return ProcessName.SUSPEND_LIGHT_GROUP_PROCESS;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupInitProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupInitProcessListener.java
deleted file mode 100644
index 2902ca88c..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupInitProcessListener.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener;
-
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * Initialize the listener for inlong group information
- */
-@Service
-public class GroupInitProcessListener implements ProcessEventListener {
-
- @Autowired
- private InlongGroupService groupService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.CREATE;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
- InlongGroupInfo groupInfo = groupService.get(context.getProcessForm().getInlongGroupId());
- if (groupInfo != null) {
- final int status = GroupStatus.CONFIG_ING.getCode();
- final String username = context.getOperator();
- groupService.updateStatus(groupInfo.getInlongGroupId(), status, username);
- form.setGroupInfo(groupInfo);
- } else {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
- return ListenerResult.success();
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateListener.java
deleted file mode 100644
index 962b4b880..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateListener.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener;
-
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * Update listener for inlong group
- */
-@Service
-public class GroupUpdateListener implements ProcessEventListener {
-
- @Autowired
- private InlongGroupService groupService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.CREATE;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
- GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
- InlongGroupInfo groupInfo = groupService.get(context.getProcessForm().getInlongGroupId());
- GroupOperateType groupOperateType = form.getGroupOperateType();
- String username = context.getOperator();
- if (groupInfo != null) {
- switch (groupOperateType) {
- case SUSPEND:
- groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.SUSPENDING.getCode(), username);
- break;
- case RESTART:
- groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.RESTARTING.getCode(), username);
- break;
- case DELETE:
- groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.DELETING.getCode(), username);
- break;
- default:
- break;
- }
- form.setGroupInfo(groupInfo);
- } else {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
- return ListenerResult.success();
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
similarity index 66%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
index 4e885b957..8154f2f7e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
@@ -18,13 +18,15 @@
package org.apache.inlong.manager.service.workflow.group.listener;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -34,11 +36,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Create group resources [process completion] event listener
+ * The listener of InlongGroup when created resources successfully.
*/
@Slf4j
@Component
-public class GroupCompleteProcessListener implements ProcessEventListener {
+public class InitGroupCompleteListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@@ -53,23 +55,32 @@ public class GroupCompleteProcessListener implements ProcessEventListener {
}
/**
- * After the process of creating inlong group resources is completed, modify the status of related
- * inlong group and all inlong stream to [Configuration Successful] [Configuration Failed]
- * <p/>{@link GroupFailedProcessListener#listen}
+ * After the process of creating InlongGroup resources is completed,
+ * modify the status of related InlongGroup to [Successful]
+ * <p/>
+ * {@link InitGroupFailedListener#listen}
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
- String applicant = context.getOperator();
- // Update inlong group status and other info
- groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
- groupService.update(form.getGroupInfo().genRequest(), applicant);
+ log.info("begin to execute InitGroupCompleteListener for groupId={}", groupId);
+
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ String operator = context.getOperator();
+ // update inlong group status and other info
+ groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+ groupService.update(groupInfo.genRequest(), operator);
- // Update status of other related configs
- streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
- sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), applicant);
+ // update status of other related configs
+ streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+ if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+ } else {
+ sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+ }
+ log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupFailedListener.java
similarity index 75%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupFailedListener.java
index 37ed3e672..fc0c1caa3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupFailedListener.java
@@ -22,8 +22,8 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.StreamStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -32,11 +32,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Create group resources [process failed] event listener
+ * The listener of InlongGroup when created resources failed.
*/
@Slf4j
@Component
-public class GroupFailedProcessListener implements ProcessEventListener {
+public class InitGroupFailedListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@@ -49,20 +49,26 @@ public class GroupFailedProcessListener implements ProcessEventListener {
}
/**
- * The process of creating inlong group resources is abnormal, and the inlong group status is changed to
- * [Configuration failed] [Configuration successful]
- * <p/>{@link GroupCompleteProcessListener#listen}
+ * After the process of creating InlongGroup resources is completed,
+ * modify the status of related InlongGroup and all InlongStream to [Failed]
+ * <p/>
+ * {@link InitGroupCompleteListener#listen}
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
- String username = context.getOperator();
+ log.info("begin to execute InitGroupFailedListener for groupId={}", groupId);
+
+ // update inlong group status
+ String operator = context.getOperator();
+ groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), operator);
+ groupService.update(form.getGroupInfo().genRequest(), operator);
+
+ // update inlong stream status
+ streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), operator);
- // Update inlong group status
- groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), username);
- // Update inlong stream status
- streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), username);
+ log.info("success to execute InitGroupFailedListener for groupId={}", groupId);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupInitListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupListener.java
similarity index 67%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupInitListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupListener.java
index 054f3c656..f2594bd6e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupInitListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupListener.java
@@ -15,18 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.group.listener.light;
+package org.apache.inlong.manager.service.workflow.group.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -34,41 +31,40 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.List;
-
/**
- * Listener of light group init.
+ * The listener for initial the InlongGroup information.
*/
@Slf4j
@Component
-public class LightGroupInitListener implements ProcessEventListener {
+public class InitGroupListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
- @Autowired
- private InlongStreamService streamService;
-
@Override
public ProcessEvent event() {
return ProcessEvent.CREATE;
}
+ /**
+ * Begin to execute the InlongGroup workflow, init the workflow context, and update other info if needed.
+ */
@Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
- LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+ GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ log.info("begin to execute InitGroupListener for groupId={}", groupId);
+
InlongGroupInfo groupInfo = form.getGroupInfo();
if (groupInfo == null) {
- throw new WorkflowListenerException(ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+ throw new WorkflowListenerException("inlong group info cannot be null for init group process");
}
- final String groupId = groupInfo.getInlongGroupId();
- final int status = GroupStatus.CONFIG_ING.getCode();
- final String username = context.getOperator();
- groupService.updateStatus(groupInfo.getInlongGroupId(), status, username);
if (CollectionUtils.isEmpty(form.getStreamInfos())) {
- List<InlongStreamInfo> streamInfos = streamService.list(groupId);
- form.setStreamInfos(streamInfos);
+ throw new WorkflowListenerException("inlong stream info list cannot be null for init group process");
}
+ groupService.updateStatus(groupId, GroupStatus.CONFIG_ING.getCode(), context.getOperator());
+
+ log.info("success to execute InitGroupListener for groupId={}", groupId);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
similarity index 70%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
index 7d1d93190..6df9863ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
@@ -15,14 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.group.listener.light;
+package org.apache.inlong.manager.service.workflow.group.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -33,17 +32,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Listener of light group update complete.
+ * The listener of InlongGroup when update operates successfully.
*/
@Slf4j
@Component
-public class LightGroupUpdateCompleteListener implements ProcessEventListener {
+public class UpdateGroupCompleteListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@Autowired
- private InlongStreamService streamService;
- @Autowired
private StreamSourceService sourceService;
@Override
@@ -53,27 +50,33 @@ public class LightGroupUpdateCompleteListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
- LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
- final String groupId = form.getInlongGroupId();
- final String applicant = context.getOperator();
- GroupOperateType groupOperateType = form.getGroupOperateType();
- switch (groupOperateType) {
+ GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ GroupOperateType operateType = form.getGroupOperateType();
+ log.info("begin to execute UpdateGroupCompleteListener for groupId={}, operateType={}", groupId, operateType);
+
+ String operator = context.getOperator();
+ switch (operateType) {
case SUSPEND:
- groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), applicant);
- sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_FROZEN.getCode(), applicant);
+ groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), operator);
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_FROZEN.getCode(), operator);
break;
case RESTART:
- groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), applicant);
- sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
+ groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), operator);
+ sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
break;
case DELETE:
- groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), applicant);
- sourceService.logicDeleteAll(groupId, null, applicant);
+ groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), operator);
+ sourceService.logicDeleteAll(groupId, null, operator);
break;
default:
break;
}
- groupService.update(form.getGroupInfo().genRequest(), applicant);
+
+ // update inlong group status and other configs
+ groupService.update(form.getGroupInfo().genRequest(), operator);
+
+ log.info("success to execute UpdateGroupCompleteListener for groupId={}, operateType={}", groupId, operateType);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupFailedListener.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateFailedListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupFailedListener.java
index 355ac185b..cb93afece 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupFailedListener.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.workflow.group.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -30,11 +29,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Update failed listener for inlong group
+ * The listener of InlongGroup when update operates failed.
*/
@Slf4j
@Component
-public class GroupUpdateFailedListener implements ProcessEventListener {
+public class UpdateGroupFailedListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@@ -47,11 +46,15 @@ public class GroupUpdateFailedListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ log.info("begin to execute UpdateGroupFailedListener for groupId={}", groupId);
+
+ // update inlong group status and other info
String operator = context.getOperator();
- InlongGroupInfo groupInfo = form.getGroupInfo();
- // Update inlong group status and other info
- groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.CONFIG_FAILED.getCode(), operator);
- groupService.update(groupInfo.genRequest(), operator);
+ groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), operator);
+ groupService.update(form.getGroupInfo().genRequest(), operator);
+
+ log.info("success to execute UpdateGroupFailedListener for groupId={}", groupId);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupListener.java
similarity index 72%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupListener.java
index 3886b09f5..24442fa02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.group.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -31,43 +32,48 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Update completed listener for inlong group
+ * The listener for the update InlongGroup.
*/
@Slf4j
@Component
-public class GroupUpdateCompleteListener implements ProcessEventListener {
+public class UpdateGroupListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@Override
public ProcessEvent event() {
- return ProcessEvent.COMPLETE;
+ return ProcessEvent.CREATE;
}
@Override
public ListenerResult listen(WorkflowContext context) throws Exception {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
- String operator = context.getOperator();
- GroupOperateType operateType = form.getGroupOperateType();
+ String groupId = form.getInlongGroupId();
+ log.info("begin to execute UpdateGroupListener for groupId={}", groupId);
+
InlongGroupInfo groupInfo = form.getGroupInfo();
- Integer nextStatus;
+ if (groupInfo == null) {
+ throw new BusinessException("InlongGroupInfo cannot be null for update group process");
+ }
+
+ GroupOperateType operateType = form.getGroupOperateType();
+ String operator = context.getOperator();
switch (operateType) {
- case RESTART:
- nextStatus = GroupStatus.RESTARTED.getCode();
- break;
case SUSPEND:
- nextStatus = GroupStatus.SUSPENDED.getCode();
+ groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
+ break;
+ case RESTART:
+ groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
break;
case DELETE:
- nextStatus = GroupStatus.DELETED.getCode();
+ groupService.updateStatus(groupId, GroupStatus.DELETING.getCode(), operator);
break;
default:
- throw new RuntimeException(String.format("Unsupported operation=%s for inlong group", operateType));
+ break;
}
- // Update inlong group status and other info
- groupService.updateStatus(groupInfo.getInlongGroupId(), nextStatus, operator);
- groupService.update(groupInfo.genRequest(), operator);
+
+ log.info("success to execute UpdateGroupListener for groupId={}, operateType={}", groupId, operateType);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupAfterApprovedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/AfterApprovedTaskListener.java
similarity index 80%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupAfterApprovedListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/AfterApprovedTaskListener.java
index 15d002e6e..df2cad08f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupAfterApprovedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/AfterApprovedTaskListener.java
@@ -15,18 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.group.listener.approve;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.task.InlongGroupApproveForm;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.TaskEvent;
@@ -34,15 +33,14 @@ import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.List;
import java.util.Objects;
/**
- * Approve pass listener for new inlong group
+ * The listener for modifying InlongGroup info after the application InlongGroup process is approved.
*/
@Slf4j
@Component
-public class GroupAfterApprovedListener implements TaskEventListener {
+public class AfterApprovedTaskListener implements TaskEventListener {
@Autowired
private InlongGroupService groupService;
@@ -56,14 +54,17 @@ public class GroupAfterApprovedListener implements TaskEventListener {
return TaskEvent.APPROVE;
}
+ /**
+ * After approval, persist the modified info during the approval in DB.
+ */
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- // Save the data format selected at the time of approval and the cluster information of the inlong stream
InlongGroupApproveForm form = (InlongGroupApproveForm) context.getActionContext().getForm();
-
InlongGroupApproveRequest approveInfo = form.getGroupApproveInfo();
- // Only the [Wait approval] status allowed the passing operation
String groupId = approveInfo.getInlongGroupId();
+ log.info("begin to execute AfterApprovedTaskListener for groupId={}", groupId);
+
+ // only the [TO_BE_APPROVAL] status allowed the passing operation
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
@@ -72,12 +73,12 @@ public class GroupAfterApprovedListener implements TaskEventListener {
throw new WorkflowListenerException("inlong group status is [wait_approval], not allowed to approve again");
}
- // Save the inlong group information after approval
+ // save the inlong group and other info after approval
groupService.updateAfterApprove(approveInfo, context.getOperator());
+ streamService.updateAfterApprove(form.getStreamApproveInfoList(), context.getOperator());
- // Save inlong stream information after approval
- List<InlongStreamApproveRequest> streamApproveInfoList = form.getStreamApproveInfoList();
- streamService.updateAfterApprove(streamApproveInfoList, context.getOperator());
+ log.info("success to execute AfterApprovedTaskListener for groupId={}", groupId);
return ListenerResult.success();
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupApproveProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
similarity index 62%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupApproveProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
index cc560e436..8f1890621 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupApproveProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
@@ -15,17 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.group.listener.approve;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupMode;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.workflow.ProcessName;
@@ -40,65 +37,42 @@ import org.springframework.stereotype.Component;
import java.util.List;
/**
- * After the new inlong group is approved, initiate a listener for other processes
+ * The listener that approves to apply for InlongGroup.
+ * <p/>
+ * After approval, start other follow-up processes.
*/
@Slf4j
@Component
-public class GroupApproveProcessListener implements ProcessEventListener {
+public class ApproveApplyProcessListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@Autowired
- private WorkflowService workflowService;
- @Autowired
private InlongStreamService streamService;
+ @Autowired
+ private WorkflowService workflowService;
@Override
public ProcessEvent event() {
return ProcessEvent.COMPLETE;
}
- /**
- * Initiate the process of creating inlong group resources after new inlong group access approved
- */
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewGroupProcessForm form = (NewGroupProcessForm) context.getProcessForm();
+ ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
- InlongGroupInfo groupInfo = groupService.get(groupId);
- GroupMode mode = GroupMode.parseGroupMode(groupInfo);
- switch (mode) {
- case NORMAL:
- createGroupResource(context, groupInfo);
- break;
- case LIGHTWEIGHT:
- createLightGroupResource(context, groupInfo);
- break;
- default:
- throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
- }
-
- return ListenerResult.success();
- }
+ log.info("begin to execute ApproveApplyProcessListener for groupId={}", groupId);
- private void createGroupResource(WorkflowContext context, InlongGroupInfo groupInfo) {
+ InlongGroupInfo groupInfo = groupService.get(groupId);
GroupResourceProcessForm processForm = new GroupResourceProcessForm();
processForm.setGroupInfo(groupInfo);
String username = context.getOperator();
- String groupId = groupInfo.getInlongGroupId();
List<InlongStreamInfo> streamList = streamService.list(groupId);
processForm.setStreamInfos(streamList);
workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm);
- }
- private void createLightGroupResource(WorkflowContext context, InlongGroupInfo groupInfo) {
- LightGroupResourceProcessForm processForm = new LightGroupResourceProcessForm();
- processForm.setGroupInfo(groupInfo);
- String username = context.getOperator();
- String groupId = groupInfo.getInlongGroupId();
- List<InlongStreamInfo> streamList = streamService.list(groupId);
- processForm.setStreamInfos(streamList);
- workflowService.start(ProcessName.CREATE_LIGHT_GROUP_PROCESS, username, processForm);
+ log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId);
+ return ListenerResult.success();
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/CancelApplyProcessListener.java
similarity index 71%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/CancelApplyProcessListener.java
index c1fd225f9..582d95d37 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/CancelApplyProcessListener.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.group.listener;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
-import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -31,12 +30,14 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Objects;
+
/**
- * Event listener for new group access, the initiator cancels the approval
+ * The listener of InlongGroup when cancels the application.
*/
@Slf4j
@Component
-public class GroupCancelProcessListener implements ProcessEventListener {
+public class CancelApplyProcessListener implements ProcessEventListener {
@Autowired
private InlongGroupEntityMapper groupMapper;
@@ -46,25 +47,28 @@ public class GroupCancelProcessListener implements ProcessEventListener {
return ProcessEvent.CANCEL;
}
+ /**
+ * After canceling the approval, the status becomes [ToBeSubmit]
+ */
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewGroupProcessForm form = (NewGroupProcessForm) context.getProcessForm();
- // After canceling the approval, the status becomes [Waiting to submit]
+ ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
+ log.info("begin to execute CancelApplyProcessListener for groupId={}", groupId);
- // Only the [Wait approval] status allowed the canceling operation
+ // only the [ToBeApproval] status allowed the canceling operation
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
- throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
+ throw new WorkflowListenerException("InlongGroup not found with groupId=" + groupId);
}
if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) {
- throw new WorkflowListenerException("current status was not allowed to cancel business");
+ throw new WorkflowListenerException(String.format("Current status [%s] was not allowed to cancel",
+ GroupStatus.forCode(entity.getStatus())));
}
+ String operator = context.getOperator();
+ groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), operator);
- // After canceling the approval, the status becomes [Waiting to submit]
- String username = context.getOperator();
- groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), username);
-
+ log.info("success to execute CancelApplyProcessListener for groupId={}", groupId);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/RejectApplyProcessListener.java
similarity index 83%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/RejectApplyProcessListener.java
index bcbf2046a..1a903537c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/RejectApplyProcessListener.java
@@ -15,13 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.group.listener.approve;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
-import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -32,12 +31,14 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
+import java.util.Objects;
+
/**
- * Approve reject listener for new inlong group
+ * The listener that rejects to apply for InlongGroup.
*/
@Slf4j
@Component
-public class GroupRejectProcessListener implements ProcessEventListener {
+public class RejectApplyProcessListener implements ProcessEventListener {
@Autowired
private InlongGroupService groupService;
@@ -51,19 +52,20 @@ public class GroupRejectProcessListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- NewGroupProcessForm form = (NewGroupProcessForm) context.getProcessForm();
-
- // Only the [Wait approval] status allowed the rejecting operation
+ ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
+ log.info("begin to execute RejectApplyProcessListener for groupId={}", groupId);
+
+ // only the [TO_BE_APPROVAL] status allowed the rejecting operation
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity == null) {
- throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
+ throw new WorkflowListenerException("inlong group not found with groupId=" + groupId);
}
if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) {
throw new WorkflowListenerException("current status was not allowed to reject inlong group");
}
- // After reject, update inlong group status to [GROUP_APPROVE_REJECT]
+ // after reject, update InlongGroup status to [APPROVE_REJECTED]
String username = context.getOperator();
groupService.updateStatus(groupId, GroupStatus.APPROVE_REJECTED.getCode(), username);
return ListenerResult.success();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
deleted file mode 100644
index 764b3ca44..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.enums.StreamStatus;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener of light group complete.
- */
-@Slf4j
-@Component
-public class LightGroupCompleteListener implements ProcessEventListener {
-
- @Autowired
- private InlongGroupService groupService;
- @Autowired
- private InlongStreamService streamService;
- @Autowired
- private StreamSourceService sourceService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.COMPLETE;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
- LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
- final String groupId = form.getGroupInfo().getInlongGroupId();
- final String applicant = context.getOperator();
- // Update inlong group status and other info
- groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
- groupService.update(form.getGroupInfo().genRequest(), applicant);
- // Update status of other related configs
- streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
- sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
- return ListenerResult.success();
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
deleted file mode 100644
index f821bf1b1..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.StreamStatus;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener of light group failed.
- */
-@Slf4j
-@Component
-public class LightGroupFailedListener implements ProcessEventListener {
-
- @Autowired
- private InlongGroupService groupService;
- @Autowired
- private InlongStreamService streamService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.FAIL;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
- LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
- final String groupId = form.getGroupInfo().getInlongGroupId();
- final String applicant = context.getOperator();
- // Update inlong group status
- groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), applicant);
- groupService.update(form.getGroupInfo().genRequest(), applicant);
- // Update inlong stream status
- streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), applicant);
- return ListenerResult.fail();
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateFailedListener.java
deleted file mode 100644
index ffb6872c3..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateFailedListener.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener of light group update failed.
- */
-@Slf4j
-@Component
-public class LightGroupUpdateFailedListener implements ProcessEventListener {
-
- @Autowired
- private InlongGroupService groupService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.FAIL;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
- LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
- InlongGroupInfo groupInfo = form.getGroupInfo();
- final String groupId = groupInfo.getInlongGroupId();
- final String applicant = context.getOperator();
- // Update inlong group status
- groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), applicant);
- groupService.update(groupInfo.genRequest(), applicant);
- return ListenerResult.success();
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
deleted file mode 100644
index df53430b5..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-/**
- * Listener of light group update.
- */
-@Slf4j
-@Component
-public class LightGroupUpdateListener implements ProcessEventListener {
-
- @Autowired
- private InlongGroupService groupService;
- @Autowired
- private InlongStreamService streamService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.CREATE;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
- LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
- final String groupId = form.getGroupInfo().getInlongGroupId();
- final String applicant = context.getOperator();
- GroupOperateType groupOperateType = form.getGroupOperateType();
- switch (groupOperateType) {
- case SUSPEND:
- groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), applicant);
- break;
- case RESTART:
- groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), applicant);
- break;
- case DELETE:
- groupService.updateStatus(groupId, GroupStatus.DELETING.getCode(), applicant);
- break;
- default:
- break;
- }
- if (CollectionUtils.isEmpty(form.getStreamInfos())) {
- List<InlongStreamInfo> streamInfos = streamService.list(groupId);
- form.setStreamInfos(streamInfos);
- }
- return ListenerResult.success();
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 8876d79eb..070d4fc76 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamFailedProcessListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamInitProcessListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.InitStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.InitStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.InitStreamListener;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,61 +41,61 @@ import org.springframework.stereotype.Component;
public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private StreamInitProcessListener streamInitProcessListener;
+ private InitStreamListener initStreamListener;
@Autowired
- private StreamFailedProcessListener streamFailedProcessListener;
+ private InitStreamCompleteListener initStreamCompleteListener;
@Autowired
- private StreamCompleteProcessListener streamCompleteProcessListener;
+ private InitStreamFailedListener initStreamFailedListener;
@Autowired
private StreamTaskListenerFactory streamTaskListenerFactory;
@Override
public WorkflowProcess defineProcess() {
-
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(streamInitProcessListener);
- process.addListener(streamFailedProcessListener);
- process.addListener(streamCompleteProcessListener);
-
- process.setType("Stream Resource Creation");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(StreamResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(initStreamListener);
+ process.addListener(initStreamFailedListener);
+ process.addListener(initStreamCompleteListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- // init MQ
+ // Init MQ
ServiceTask initMQTask = new ServiceTask();
- initMQTask.setName("initMQ");
+ initMQTask.setName("InitMQ");
initMQTask.setDisplayName("Stream-InitMQ");
initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
initMQTask.addListenerProvider(streamTaskListenerFactory);
process.addTask(initMQTask);
- // init Sink
+ // Init Sink
ServiceTask initSinkTask = new ServiceTask();
- initSinkTask.setName("initSink");
+ initSinkTask.setName("InitSink");
initSinkTask.setDisplayName("Stream-InitSink");
initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
initSinkTask.addListenerProvider(streamTaskListenerFactory);
process.addTask(initSinkTask);
- // init Sort
+ // Init Sort
ServiceTask initSortTask = new ServiceTask();
- initSortTask.setName("initSort");
+ initSortTask.setName("InitSort");
initSortTask.setDisplayName("Stream-InitSort");
initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
initSortTask.addListenerProvider(streamTaskListenerFactory);
process.addTask(initSortTask);
- // init Source
+ // Init Source
ServiceTask initSourceTask = new ServiceTask();
- initSourceTask.setName("initSource");
+ initSourceTask.setName("InitSource");
initSourceTask.setDisplayName("Stream-InitSource");
initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
initSourceTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index 257e14adc..c87cc760d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamListener;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,11 +41,11 @@ import org.springframework.stereotype.Component;
public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private StreamUpdateListener streamUpdateListener;
+ private UpdateStreamListener updateStreamListener;
@Autowired
- private StreamUpdateCompleteListener streamUpdateCompleteListener;
+ private UpdateStreamCompleteListener updateStreamCompleteListener;
@Autowired
- private StreamUpdateFailedListener streamUpdateFailedListener;
+ private UpdateStreamFailedListener updateStreamFailedListener;
@Autowired
private StreamTaskListenerFactory streamTaskListenerFactory;
@@ -53,23 +53,25 @@ public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
public WorkflowProcess defineProcess() {
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(streamUpdateListener);
- process.addListener(streamUpdateCompleteListener);
- process.addListener(streamUpdateFailedListener);
- process.setType("Stream Resource Delete");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(StreamResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(updateStreamListener);
+ process.addListener(updateStreamCompleteListener);
+ process.addListener(updateStreamFailedListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- // Delete datasource
+ // Delete Source
ServiceTask deleteDataSourceTask = new ServiceTask();
- deleteDataSourceTask.setName("deleteSource");
+ deleteDataSourceTask.setName("DeleteSource");
deleteDataSourceTask.setDisplayName("Stream-DeleteSource");
deleteDataSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
deleteDataSourceTask.addListenerProvider(streamTaskListenerFactory);
@@ -77,15 +79,15 @@ public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
// Delete MQ
ServiceTask deleteMQTask = new ServiceTask();
- deleteMQTask.setName("deleteMQ");
+ deleteMQTask.setName("DeleteMQ");
deleteMQTask.setDisplayName("Stream-DeleteMQ");
deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
deleteMQTask.addListenerProvider(streamTaskListenerFactory);
process.addTask(deleteMQTask);
- // Delete sort
+ // Delete Sort
ServiceTask deleteSortTask = new ServiceTask();
- deleteSortTask.setName("deleteSort");
+ deleteSortTask.setName("DeleteSort");
deleteSortTask.setDisplayName("Stream-DeleteSort");
deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
deleteSortTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
index 8407c003a..fa0495098 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamListener;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,11 +41,11 @@ import org.springframework.stereotype.Component;
public class RestartStreamWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private StreamUpdateListener streamUpdateListener;
+ private UpdateStreamListener updateStreamListener;
@Autowired
- private StreamUpdateFailedListener streamUpdateFailedListener;
+ private UpdateStreamCompleteListener updateStreamCompleteListener;
@Autowired
- private StreamUpdateCompleteListener streamUpdateCompleteListener;
+ private UpdateStreamFailedListener updateStreamFailedListener;
@Autowired
private StreamTaskListenerFactory streamTaskListenerFactory;
@@ -53,31 +53,33 @@ public class RestartStreamWorkflowDefinition implements WorkflowDefinition {
public WorkflowProcess defineProcess() {
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(streamUpdateListener);
- process.addListener(streamUpdateCompleteListener);
- process.addListener(streamUpdateFailedListener);
- process.setType("Stream Resource Restart");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(StreamResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(updateStreamListener);
+ process.addListener(updateStreamCompleteListener);
+ process.addListener(updateStreamFailedListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //restart sort
+ // Restart Sort
ServiceTask restartSortTask = new ServiceTask();
- restartSortTask.setName("restartSort");
+ restartSortTask.setName("RestartSort");
restartSortTask.setDisplayName("Stream-RestartSort");
restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
restartSortTask.addListenerProvider(streamTaskListenerFactory);
process.addTask(restartSortTask);
- //restart datasource
+ // Restart Source
ServiceTask restartDataSourceTask = new ServiceTask();
- restartDataSourceTask.setName("restartSource");
+ restartDataSourceTask.setName("RestartSource");
restartDataSourceTask.setDisplayName("Stream-RestartSource");
restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
restartDataSourceTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
index 38d23efda..9eb8591c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamListener;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,11 +41,11 @@ import org.springframework.stereotype.Component;
public class SuspendStreamWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private StreamUpdateListener streamUpdateListener;
+ private UpdateStreamListener updateStreamListener;
@Autowired
- private StreamUpdateCompleteListener streamUpdateCompleteListener;
+ private UpdateStreamCompleteListener updateStreamCompleteListener;
@Autowired
- private StreamUpdateFailedListener streamUpdateFailedListener;
+ private UpdateStreamFailedListener updateStreamFailedListener;
@Autowired
private StreamTaskListenerFactory streamTaskListenerFactory;
@@ -53,31 +53,33 @@ public class SuspendStreamWorkflowDefinition implements WorkflowDefinition {
public WorkflowProcess defineProcess() {
// Configuration process
WorkflowProcess process = new WorkflowProcess();
- process.addListener(streamUpdateListener);
- process.addListener(streamUpdateFailedListener);
- process.addListener(streamUpdateCompleteListener);
- process.setType("Stream Resource Suspend");
process.setName(getProcessName().name());
+ process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
process.setFormClass(StreamResourceProcessForm.class);
process.setVersion(1);
process.setHidden(1);
+ // Set up the listener
+ process.addListener(updateStreamListener);
+ process.addListener(updateStreamFailedListener);
+ process.addListener(updateStreamCompleteListener);
+
// Start node
StartEvent startEvent = new StartEvent();
process.setStartEvent(startEvent);
- //stop datasource
+ // Stop Source
ServiceTask stopDataSourceTask = new ServiceTask();
- stopDataSourceTask.setName("stopSource");
+ stopDataSourceTask.setName("StopSource");
stopDataSourceTask.setDisplayName("Stream-StopSource");
stopDataSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
stopDataSourceTask.addListenerProvider(streamTaskListenerFactory);
process.addTask(stopDataSourceTask);
- //stop sort
+ // Stop Sort
ServiceTask stopSortTask = new ServiceTask();
- stopSortTask.setName("stopSort");
+ stopSortTask.setName("StopSort");
stopSortTask.setDisplayName("Stream-StopSort");
stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
stopSortTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamCompleteListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamCompleteListener.java
index 3378eb63d..e890f64d0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamCompleteListener.java
@@ -33,11 +33,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Event listener for completed creation of inlong stream resource
+ * The listener of InlongStream when created resources successfully.
*/
@Slf4j
@Component
-public class StreamCompleteProcessListener implements ProcessEventListener {
+public class InitStreamCompleteListener implements ProcessEventListener {
@Autowired
private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamFailedListener.java
similarity index 94%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamFailedListener.java
index 3928c5d83..9ebb4e31c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamFailedListener.java
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Event listener for failed creation of inlong stream resource
+ * The listener of InlongStream when created resources failed.
*/
@Slf4j
@Component
-public class StreamFailedProcessListener implements ProcessEventListener {
+public class InitStreamFailedListener implements ProcessEventListener {
@Autowired
private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamInitProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamInitProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamListener.java
index c24a39428..2cc1a3246 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamInitProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamListener.java
@@ -32,10 +32,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Initialize the listener for inlong group information
+ * The listener for initial the InlongStream information.
*/
@Service
-public class StreamInitProcessListener implements ProcessEventListener {
+public class InitStreamListener implements ProcessEventListener {
@Autowired
private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
index cf9034fde..10fcc65bd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Update completed listener for inlong stream
+ * The listener of InlongStream when update operates successfully.
*/
@Slf4j
@Component
-public class StreamUpdateCompleteListener implements ProcessEventListener {
+public class UpdateStreamCompleteListener implements ProcessEventListener {
@Autowired
private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamFailedListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateFailedListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamFailedListener.java
index f6fc212a9..c57f871c0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamFailedListener.java
@@ -30,11 +30,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Update failed listener for inlong stream
+ * The listener of InlongStream when update operates failed.
*/
@Slf4j
@Component
-public class StreamUpdateFailedListener implements ProcessEventListener {
+public class UpdateStreamFailedListener implements ProcessEventListener {
@Autowired
private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamListener.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamListener.java
index 75a408e99..4d1917640 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamListener.java
@@ -30,10 +30,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Update listener for inlong stream
+ * The listener for the update InlongStream.
*/
@Service
-public class StreamUpdateListener implements ProcessEventListener {
+public class UpdateStreamListener implements ProcessEventListener {
@Autowired
private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 6b7866e4b..0789d63c5 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -17,13 +17,29 @@
package org.apache.inlong.manager.service;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.none.InlongNoneMqInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.test.BaseTest;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.test.context.SpringBootTest;
+import java.util.ArrayList;
+import java.util.List;
+
/**
* Test class for base test service.
*/
@@ -31,14 +47,98 @@ import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = ServiceBaseTest.class)
public class ServiceBaseTest extends BaseTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBaseTest.class);
-
public static final String GLOBAL_GROUP_ID = "global_group";
public static final String GLOBAL_STREAM_ID = "global_stream";
public static final String GLOBAL_OPERATOR = "admin";
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBaseTest.class);
+
+ @Autowired
+ protected InlongGroupService groupService;
+ @Autowired
+ protected InlongStreamService streamService;
@Test
public void test() {
LOGGER.info("The test class cannot be empty, otherwise 'No runnable methods exception' will be reported");
}
+
+ /**
+ * Create InlongGroup from the given specified InlongGroupId
+ *
+ * @return InlongGroupInfo after saving
+ */
+ public InlongGroupInfo createInlongGroup(String inlongGroupId, String mqType) {
+ try {
+ streamService.logicDeleteAll(inlongGroupId, GLOBAL_OPERATOR);
+ groupService.delete(inlongGroupId, GLOBAL_OPERATOR);
+ } catch (Exception e) {
+ // ignore
+ }
+
+ InlongGroupInfo groupInfo;
+ if (MQType.forType(mqType) == MQType.PULSAR || MQType.forType(mqType) == MQType.TDMQ_PULSAR) {
+ groupInfo = new InlongPulsarInfo();
+ } else if (MQType.forType(mqType) == MQType.TUBE) {
+ groupInfo = new InlongPulsarInfo();
+ } else {
+ groupInfo = new InlongNoneMqInfo();
+ }
+
+ groupInfo.setInlongGroupId(inlongGroupId);
+ groupInfo.setMqType(mqType);
+ groupInfo.setMqResource("test-queue");
+ groupInfo.setInCharges(GLOBAL_OPERATOR);
+ groupInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
+ groupService.save(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+ groupService.updateStatus(inlongGroupId, GroupStatus.TO_BE_APPROVAL.getCode(), GLOBAL_OPERATOR);
+ groupService.updateStatus(inlongGroupId, GroupStatus.APPROVE_PASSED.getCode(), GLOBAL_OPERATOR);
+ groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+ return groupInfo;
+ }
+
+ /**
+ * Create InlongStream from the given InlongGroupInfo and specified InlongStreamId
+ *
+ * @return InlongStreamInfo after saving
+ */
+ public InlongStreamInfo createStreamInfo(InlongGroupInfo groupInfo, String inlongStreamId) {
+ String inlongGroupId = groupInfo.getInlongGroupId();
+ // delete first
+ try {
+ streamService.delete(inlongGroupId, inlongStreamId, GLOBAL_OPERATOR);
+ } catch (Exception e) {
+ // ignore
+ }
+
+ InlongStreamRequest request = new InlongStreamRequest();
+ request.setInlongGroupId(inlongGroupId);
+ request.setInlongStreamId(inlongStreamId);
+ request.setMqResource(inlongStreamId);
+ request.setDataSeparator("124");
+ request.setDataEncoding("UTF-8");
+ request.setFieldList(createStreamFields(inlongGroupId, inlongStreamId));
+ streamService.save(request, GLOBAL_OPERATOR);
+
+ return streamService.get(request.getInlongGroupId(), request.getInlongStreamId());
+ }
+
+ /**
+ * Get StreamField list from the given groupId and streamId
+ *
+ * @return list of StreamField
+ */
+ public List<StreamField> createStreamFields(String groupId, String streamId) {
+ final List<StreamField> streamFields = new ArrayList<>();
+ StreamField fieldInfo = new StreamField();
+ fieldInfo.setInlongGroupId(groupId);
+ fieldInfo.setInlongStreamId(streamId);
+ fieldInfo.setFieldName("id");
+ fieldInfo.setFieldType(FieldType.INT.toString());
+ fieldInfo.setFieldComment("idx");
+ streamFields.add(fieldInfo);
+ return streamFields;
+ }
+
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
index fbfad559c..23b6add8b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
@@ -40,69 +40,64 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@EnableAutoConfiguration
public class InlongGroupProcessOperationTest extends ServiceBaseTest {
+ private static final String GROUP_ID = "test_group_process";
private static final String OPERATOR = "operator";
- private static final String GROUP_NAME = "test_biz";
-
- private static final String GROUP_ID = "test_biz";
-
@Autowired
private InlongGroupService groupService;
-
@Autowired
private InlongGroupProcessOperation groupProcessOperation;
-
@Autowired
private GroupTaskListenerFactory groupTaskListenerFactory;
- /**
- * Set some base information before start process.
- */
- public void before() {
+ @Test
+ public void testAllProcess() {
+ before();
+ testStartProcess();
+ testSuspendProcess();
+ testRestartProcess();
+
+ // delete the process, will delete the Pulsar resource
+ // TODO Mock the cluster related operate
+ // boolean result = groupProcessOperation.deleteProcess(GROUP_ID, OPERATOR);
+ // Assertions.assertTrue(result);
+ }
+
+ private void before() {
MockPlugin mockPlugin = new MockPlugin();
groupTaskListenerFactory.acceptPlugin(mockPlugin);
InlongPulsarRequest groupInfo = new InlongPulsarRequest();
groupInfo.setInlongGroupId(GROUP_ID);
- groupInfo.setName(GROUP_NAME);
groupInfo.setInCharges(OPERATOR);
groupInfo.setMqType(MQType.PULSAR.getType());
groupService.save(groupInfo, OPERATOR);
}
- // There will be concurrency problems in the overall operation, and the testDeleteProcess() method will call
- // @Test
- public void testStartProcess() {
- before();
+ private void testStartProcess() {
+ InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
+ groupInfo.setInlongClusterTag("default_cluster");
+ groupService.update(groupInfo.genRequest(), OPERATOR);
+
WorkflowResult result = groupProcessOperation.startProcess(GROUP_ID, OPERATOR);
ProcessResponse response = result.getProcessInfo();
Assertions.assertSame(response.getStatus(), ProcessStatus.PROCESSING);
- InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
+ groupInfo = groupService.get(GROUP_ID);
Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.TO_BE_APPROVAL.getCode());
}
- // There will be concurrency problems in the overall operation, and the testDeleteProcess() method will call
- // @Test
- public void testSuspendProcess() {
- testStartProcess();
- InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
+ private void testSuspendProcess() {
groupService.updateStatus(GROUP_ID, GroupStatus.APPROVE_PASSED.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_ING.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
WorkflowResult result = groupProcessOperation.suspendProcess(GROUP_ID, OPERATOR);
ProcessResponse response = result.getProcessInfo();
Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
- groupInfo = groupService.get(GROUP_ID);
+ InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.SUSPENDED.getCode());
}
- // There will be concurrency problems in the overall operation, and the testDeleteProcess() method will call
- // @Test
- public void testRestartProcess() {
- testSuspendProcess();
+ private void testRestartProcess() {
WorkflowResult result = groupProcessOperation.restartProcess(GROUP_ID, OPERATOR);
ProcessResponse response = result.getProcessInfo();
Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -110,12 +105,5 @@ public class InlongGroupProcessOperationTest extends ServiceBaseTest {
Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.RESTARTED.getCode());
}
- @Test
- public void testDeleteProcess() {
- testStartProcess();
- // testRestartProcess();
- // boolean result = groupProcessOperation.deleteProcess(GROUP_ID, OPERATOR);
- // Assertions.assertTrue(result);
- }
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
index adc4dd159..e0ed84739 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -122,13 +123,13 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
// @Test
public void testCreateSortConfigInUpdateWorkflow() {
- InlongGroupInfo groupInfo = initGroupForm("PULSAR", "test20");
+ InlongGroupInfo groupInfo = createInlongGroup("test20", MQType.MQ_PULSAR);
groupInfo.setEnableZookeeper(InlongConstants.ENABLE_ZK);
groupInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
groupService.update(groupInfo.genRequest(), OPERATOR);
- InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+ InlongStreamInfo streamInfo = createStreamInfo(groupInfo, "test_stream_info");
createHiveSink(streamInfo);
createKafkaSource(streamInfo);
GroupResourceProcessForm form = new GroupResourceProcessForm();
@@ -141,7 +142,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
ProcessResponse response = result.getProcessInfo();
Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
WorkflowProcess process = context.getProcess();
- WorkflowTask task = process.getTaskByName("stopSort");
+ WorkflowTask task = process.getTaskByName("StopSort");
Assertions.assertTrue(task instanceof ServiceTask);
Assertions.assertEquals(2, task.getNameToListenerMap().size());
List<TaskEventListener> listeners = Lists.newArrayList(task.getNameToListenerMap().values());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
similarity index 60%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
index 72f9cc52f..e922b9218 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.source.listener;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.enums.SourceStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -28,97 +29,98 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.ServiceBaseTest;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowServiceImplTest;
import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.core.ProcessService;
import org.apache.inlong.manager.workflow.definition.ServiceTask;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
import org.apache.inlong.manager.workflow.definition.WorkflowTask;
import org.apache.inlong.manager.workflow.util.WorkflowBeanUtils;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
import org.springframework.beans.factory.annotation.Autowired;
/**
- * Test class for operate binlog source, such as frozen or restart.
+ * Test class for operate StreamSource, such as frozen or restart.
*/
-public class DataSourceListenerTest extends WorkflowServiceImplTest {
+public class StreamSourceListenerTest extends ServiceBaseTest {
- public GroupResourceProcessForm form;
+ private static final String GROUP_ID = "test-source-group-id";
+ private static final String STREAM_ID = "test-source-stream-id";
- public InlongGroupInfo groupInfo;
@Autowired
- private StreamSourceService streamSourceService;
+ private ProcessService processService;
+ @Autowired
+ private StreamSourceService sourceService;
+
+ private InlongGroupInfo groupInfo;
- @BeforeEach
- public void init() {
- subType = "data_source";
+ /**
+ * There will be concurrency problems in the overall operation,This method temporarily fails the test
+ */
+ // @Test
+ public void testAllOperate() {
+ groupInfo = createInlongGroup(GROUP_ID, MQType.MQ_PULSAR);
+ groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_ING.getCode(), GLOBAL_OPERATOR);
+ groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
+ groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+ Integer sourceId = this.createBinlogSource(groupInfo);
+ testFrozenSource(sourceId);
+ testRestartSource(sourceId);
}
- public Integer createBinlogSource(InlongGroupInfo groupInfo) {
- final InlongStreamInfo stream = createStreamInfo(groupInfo);
+ private Integer createBinlogSource(InlongGroupInfo groupInfo) {
+ InlongStreamInfo stream = createStreamInfo(groupInfo, STREAM_ID);
MySQLBinlogSourceRequest sourceRequest = new MySQLBinlogSourceRequest();
sourceRequest.setInlongGroupId(stream.getInlongGroupId());
sourceRequest.setInlongStreamId(stream.getInlongStreamId());
sourceRequest.setSourceName("binlog-collect");
- return streamSourceService.save(sourceRequest, OPERATOR);
+ return sourceService.save(sourceRequest, GLOBAL_OPERATOR);
}
- /**
- * There will be concurrency problems in the overall operation,This method temporarily fails the test
- */
- //@Test
- public void testFrozenSource() {
- groupInfo = initGroupForm("PULSAR", "test1");
- groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
-
- final int sourceId = createBinlogSource(groupInfo);
- streamSourceService.updateStatus(groupInfo.getInlongGroupId(), null,
- SourceStatus.SOURCE_NORMAL.getCode(), OPERATOR);
+ private void testFrozenSource(Integer sourceId) {
+ sourceService.updateStatus(GROUP_ID, null, SourceStatus.SOURCE_NORMAL.getCode(), GLOBAL_OPERATOR);
- form = new GroupResourceProcessForm();
+ GroupResourceProcessForm form = new GroupResourceProcessForm();
form.setGroupInfo(groupInfo);
form.setGroupOperateType(GroupOperateType.SUSPEND);
- WorkflowContext context = processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, form);
+ WorkflowContext context = processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), GLOBAL_OPERATOR, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
WorkflowProcess process = context.getProcess();
- WorkflowTask task = process.getTaskByName("stopSource");
+ WorkflowTask task = process.getTaskByName("StopSource");
Assertions.assertTrue(task instanceof ServiceTask);
- StreamSource streamSource = streamSourceService.get(sourceId);
+
+ StreamSource streamSource = sourceService.get(sourceId);
Assertions.assertSame(SourceStatus.forCode(streamSource.getStatus()), SourceStatus.TO_BE_ISSUED_FROZEN);
}
- // @Test
- public void testRestartSource() {
- // testFrozenSource();
- groupInfo = initGroupForm("PULSAR", "test2");
- groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
- groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.SUSPENDING.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
- groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.SUSPENDED.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
-
- final int sourceId = createBinlogSource(groupInfo);
- streamSourceService.updateStatus(groupInfo.getInlongGroupId(), null,
- SourceStatus.SOURCE_NORMAL.getCode(), OPERATOR);
-
- form = new GroupResourceProcessForm();
+ private void testRestartSource(Integer sourceId) {
+ groupService.updateStatus(GROUP_ID, GroupStatus.SUSPENDING.getCode(), GLOBAL_OPERATOR);
+ groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+ groupService.updateStatus(GROUP_ID, GroupStatus.SUSPENDED.getCode(), GLOBAL_OPERATOR);
+ groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+ sourceService.updateStatus(GROUP_ID, null, SourceStatus.SOURCE_NORMAL.getCode(), GLOBAL_OPERATOR);
+
+ GroupResourceProcessForm form = new GroupResourceProcessForm();
form.setGroupInfo(groupInfo);
form.setGroupOperateType(GroupOperateType.RESTART);
- WorkflowContext context = processService.start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant, form);
+ WorkflowContext context = processService.start(ProcessName.RESTART_GROUP_PROCESS.name(), GLOBAL_OPERATOR, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse response = result.getProcessInfo();
Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
WorkflowProcess process = context.getProcess();
- WorkflowTask task = process.getTaskByName("restartSource");
+ WorkflowTask task = process.getTaskByName("RestartSource");
Assertions.assertTrue(task instanceof ServiceTask);
+
+ StreamSource streamSource = sourceService.get(sourceId);
+ Assertions.assertSame(SourceStatus.forCode(streamSource.getStatus()), SourceStatus.TO_BE_ISSUED_RETRY);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 433b95d42..0257456f8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -18,17 +18,9 @@
package org.apache.inlong.manager.service.workflow;
import com.google.common.collect.Lists;
-import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.ProcessStatus;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.none.InlongNoneMqInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.StreamField;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -57,9 +49,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -71,14 +61,9 @@ import static org.mockito.Mockito.when;
public class WorkflowServiceImplTest extends ServiceBaseTest {
public static final String OPERATOR = "admin";
-
public static final String GROUP_ID = "test_group";
-
public static final String STREAM_ID = "test_stream";
-
- public static final String DATA_ENCODING = "UTF-8";
-
- protected String subType = "default";
+ private static final String DATA_ENCODING = "UTF-8";
@Autowired
protected WorkflowServiceImpl workflowService;
@@ -95,90 +80,25 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
@Autowired
protected InlongStreamService streamService;
- protected ProcessName processName;
-
protected String applicant;
-
- protected GroupResourceProcessForm form;
-
- protected final AtomicInteger tryTime = new AtomicInteger(0);
+ protected String subType = "default";
+ private ProcessName processName;
+ private GroupResourceProcessForm form;
/**
* Init inlong group form
*/
- public InlongGroupInfo initGroupForm(String mqType, String groupId) {
+ public InlongGroupInfo createInlongGroup(String inlongGroupId, String mqType) {
processName = ProcessName.CREATE_GROUP_RESOURCE;
applicant = OPERATOR;
-
- try {
- streamService.logicDeleteAll(groupId, OPERATOR);
- groupService.delete(groupId, OPERATOR);
- } catch (Exception e) {
- // ignore
- }
-
- InlongGroupInfo groupInfo;
- if (MQType.forType(mqType) == MQType.PULSAR || MQType.forType(mqType) == MQType.TDMQ_PULSAR) {
- groupInfo = new InlongPulsarInfo();
- } else if (MQType.forType(mqType) == MQType.TUBE) {
- groupInfo = new InlongPulsarInfo();
- } else {
- groupInfo = new InlongNoneMqInfo();
- }
-
- groupInfo.setName(groupId);
- groupInfo.setInCharges(OPERATOR);
- groupInfo.setInlongGroupId(groupId);
- groupInfo.setMqType(mqType);
- groupInfo.setMqResource("test-queue");
- groupInfo.setEnableCreateResource(1);
- groupService.save(groupInfo.genRequest(), OPERATOR);
-
- groupService.updateStatus(groupId, GroupStatus.TO_BE_APPROVAL.getCode(), OPERATOR);
- groupService.updateStatus(groupId, GroupStatus.APPROVE_PASSED.getCode(), OPERATOR);
- groupService.update(groupInfo.genRequest(), OPERATOR);
-
form = new GroupResourceProcessForm();
+
+ InlongGroupInfo groupInfo = super.createInlongGroup(inlongGroupId, mqType);
form.setGroupInfo(groupInfo);
- form.setStreamInfos(Lists.newArrayList(createStreamInfo(groupInfo)));
+ form.setStreamInfos(Lists.newArrayList(super.createStreamInfo(groupInfo, STREAM_ID)));
return groupInfo;
}
- /**
- * Create inlong stream
- */
- public InlongStreamInfo createStreamInfo(InlongGroupInfo groupInfo) {
- // delete first
- try {
- streamService.delete(GROUP_ID, OPERATOR, OPERATOR);
- } catch (Exception e) {
- // ignore
- }
-
- InlongStreamRequest request = new InlongStreamRequest();
- request.setInlongGroupId(groupInfo.getInlongGroupId());
- request.setInlongStreamId(STREAM_ID);
- request.setMqResource(STREAM_ID);
- request.setDataSeparator("124");
- request.setDataEncoding(DATA_ENCODING);
- request.setFieldList(createStreamFields(groupInfo.getInlongGroupId(), STREAM_ID));
- streamService.save(request, OPERATOR);
-
- return streamService.get(request.getInlongGroupId(), request.getInlongStreamId());
- }
-
- public List<StreamField> createStreamFields(String groupId, String streamId) {
- final List<StreamField> streamFields = new ArrayList<>();
- StreamField fieldInfo = new StreamField();
- fieldInfo.setInlongGroupId(groupId);
- fieldInfo.setInlongStreamId(streamId);
- fieldInfo.setFieldName("id");
- fieldInfo.setFieldType(FieldType.INT.toString());
- fieldInfo.setFieldComment("idx");
- streamFields.add(fieldInfo);
- return streamFields;
- }
-
/**
* Mock the task listener factory
*/
@@ -230,28 +150,24 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
}
@Override
- public ListenerResult listen(WorkflowContext context) throws Exception {
- int tryTimes = tryTime.addAndGet(1);
- if (tryTimes % 2 == 1) {
- throw new WorkflowListenerException();
- } else {
- return ListenerResult.success();
- }
+ public ListenerResult listen(WorkflowContext context) {
+ return ListenerResult.success();
}
};
}
@Test
- public void testStartCreatePulsarWorkflow() throws Exception {
- initGroupForm(MQType.PULSAR.getType(), "test14" + subType);
+ public void testStartCreatePulsarWorkflow() {
+ createInlongGroup("test14" + subType, MQType.MQ_PULSAR);
mockTaskListenerFactory();
+
WorkflowContext context = processService.start(processName.name(), applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
ProcessResponse processResponse = result.getProcessInfo();
- // This method temporarily fails the test, so comment it out first
- Assertions.assertSame(processResponse.getStatus(), ProcessStatus.PROCESSING);
+ Assertions.assertSame(processResponse.getStatus(), ProcessStatus.COMPLETED);
+
WorkflowProcess process = context.getProcess();
- WorkflowTask task = process.getTaskByName("initMQ");
+ WorkflowTask task = process.getTaskByName("InitMQ");
Assertions.assertTrue(task instanceof ServiceTask);
Assertions.assertEquals(2, task.getNameToListenerMap().size());
@@ -259,11 +175,11 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
Assertions.assertTrue(listeners.get(0) instanceof CreatePulsarGroupTaskListener);
Assertions.assertTrue(listeners.get(1) instanceof CreatePulsarResourceTaskListener);
- Integer processId = processResponse.getId();
- context = processService.continueProcess(processId, applicant, "continue Process");
- result = WorkflowBeanUtils.result(context);
- processResponse = result.getProcessInfo();
- Assertions.assertSame(processResponse.getStatus(), ProcessStatus.COMPLETED);
+ // Integer processId = processResponse.getId();
+ // context = processService.continueProcess(processId, applicant, "continue process");
+ // result = WorkflowBeanUtils.result(context);
+ // processResponse = result.getProcessInfo();
+ // Assertions.assertSame(processResponse.getStatus(), ProcessStatus.COMPLETED);
}
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index 26ae370e2..e428fbe1c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -36,12 +36,11 @@ public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest {
WorkflowProcess process = createGroupWorkflowDefinition.defineProcess();
WorkflowProcess cloneProcess1 = process.clone();
WorkflowProcess cloneProcess2 = cloneProcess1.clone();
- Assertions.assertTrue(cloneProcess2 != cloneProcess1);
- Assertions.assertEquals("Group Resource Creation", process.getType());
- Assertions.assertNotNull(process.getTaskByName("initSource"));
- Assertions.assertNotNull(process.getTaskByName("initMQ"));
- Assertions.assertNotNull(process.getTaskByName("initSort"));
- Assertions.assertNotNull(process.getTaskByName("initSink"));
+ Assertions.assertNotSame(cloneProcess2, cloneProcess1);
+ Assertions.assertNotNull(process.getTaskByName("InitSource"));
+ Assertions.assertNotNull(process.getTaskByName("InitMQ"));
+ Assertions.assertNotNull(process.getTaskByName("InitSort"));
+ Assertions.assertNotNull(process.getTaskByName("InitSink"));
Assertions.assertEquals(4, process.getNameToTaskMap().size());
}
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 64eb3ff03..d19693e92 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -615,9 +615,9 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
-- create default approver for new consumption and new inlong group
INSERT INTO `workflow_approver`(`process_name`, `task_name`, `filter_key`, `filter_value`, `approvers`,
`creator`, `modifier`, `create_time`, `modify_time`, `is_deleted`)
-VALUES ('NEW_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0),
- ('NEW_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+ ('APPLY_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0);
-- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 536263d79..d90678dda 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -652,9 +652,9 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
-- create default approver for new consumption and new inlong group
INSERT INTO `workflow_approver`(`process_name`, `task_name`, `filter_key`, `filter_value`, `approvers`,
`creator`, `modifier`, `create_time`, `modify_time`, `is_deleted`)
-VALUES ('NEW_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0),
- ('NEW_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+ ('APPLY_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0);
-- ----------------------------