You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/18 09:22:55 UTC

[inlong] branch master updated: [INLONG-5066][Manager] Remove the LightGroup-related workflow listener and definitions (#5080)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5928f1023 [INLONG-5066][Manager] Remove the LightGroup-related workflow listener and definitions (#5080)
5928f1023 is described below

commit 5928f1023d7489db056a26661cdfaa95bc1eddcf
Author: healchow <he...@gmail.com>
AuthorDate: Mon Jul 18 17:22:49 2022 +0800

    [INLONG-5066][Manager] Remove the LightGroup-related workflow listener and definitions (#5080)
---
 .../inlong/manager/client/ut/Kafka2HiveTest.java   |  20 +-
 .../manager/client/api/impl/InlongGroupImpl.java   |  15 +-
 .../client/api/inner/InnerGroupContext.java        |   4 +-
 .../client/api/inner/client/WorkflowClient.java    |  16 +-
 .../client/api/impl/InlongGroupImplTest.java       |  14 +-
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   7 +-
 .../inlong/manager/common/enums/GroupMode.java     |   9 +-
 ...sForm.java => ApplyConsumptionProcessForm.java} |   4 +-
 ...ProcessForm.java => ApplyGroupProcessForm.java} |   6 +-
 .../workflow/form/process/BaseProcessForm.java     |   5 +-
 .../process/LightGroupResourceProcessForm.java     |  11 +
 .../operation/ConsumptionProcessOperation.java     |  12 +-
 .../operation/InlongGroupProcessOperation.java     | 241 +++++++++------------
 .../manager/service/group/GroupCheckService.java   |  20 +-
 .../manager/service/workflow/ProcessName.java      |  32 +--
 ...er.java => ApplyConsumptionProcessHandler.java} |   8 +-
 ...ava => ApplyConsumptionWorkflowDefinition.java} |  29 +--
 .../listener/ConsumptionCancelProcessListener.java |   4 +-
 .../ConsumptionCompleteProcessListener.java        |   4 +-
 .../listener/ConsumptionPassTaskListener.java      |   4 +-
 .../listener/ConsumptionRejectProcessListener.java |   4 +-
 ...tion.java => ApplyGroupWorkflowDefinition.java} |  39 ++--
 .../group/CreateGroupWorkflowDefinition.java       |  40 ++--
 .../group/DeleteGroupWorkflowDefinition.java       |  38 ++--
 .../group/RestartGroupWorkflowDefinition.java      |  53 +++--
 .../group/SuspendGroupWorkflowDefinition.java      |  51 +++--
 .../light/CreateLightGroupWorkflowDefinition.java  |  93 --------
 .../light/DeleteLightGroupWorkflowDefinition.java  |  92 --------
 .../light/RestartLightGroupWorkflowDefinition.java |  92 --------
 .../light/SuspendLightGroupWorkflowDefinition.java |  92 --------
 .../group/listener/GroupInitProcessListener.java   |  63 ------
 .../group/listener/GroupUpdateListener.java        |  75 -------
 ...istener.java => InitGroupCompleteListener.java} |  37 ++--
 ...sListener.java => InitGroupFailedListener.java} |  28 ++-
 ...oupInitListener.java => InitGroupListener.java} |  38 ++--
 ...tener.java => UpdateGroupCompleteListener.java} |  41 ++--
 ...istener.java => UpdateGroupFailedListener.java} |  17 +-
 ...pleteListener.java => UpdateGroupListener.java} |  36 +--
 .../AfterApprovedTaskListener.java}                |  27 +--
 .../ApproveApplyProcessListener.java}              |  52 ++---
 .../CancelApplyProcessListener.java}               |  32 +--
 .../RejectApplyProcessListener.java}               |  22 +-
 .../listener/light/LightGroupCompleteListener.java |  68 ------
 .../listener/light/LightGroupFailedListener.java   |  62 ------
 .../light/LightGroupUpdateFailedListener.java      |  58 -----
 .../listener/light/LightGroupUpdateListener.java   |  80 -------
 .../stream/CreateStreamWorkflowDefinition.java     |  40 ++--
 .../stream/DeleteStreamWorkflowDefinition.java     |  32 +--
 .../stream/RestartStreamWorkflowDefinition.java    |  30 +--
 .../stream/SuspendStreamWorkflowDefinition.java    |  30 +--
 ...stener.java => InitStreamCompleteListener.java} |   4 +-
 ...Listener.java => InitStreamFailedListener.java} |   4 +-
 ...rocessListener.java => InitStreamListener.java} |   4 +-
 ...ener.java => UpdateStreamCompleteListener.java} |   4 +-
 ...stener.java => UpdateStreamFailedListener.java} |   4 +-
 ...dateListener.java => UpdateStreamListener.java} |   4 +-
 .../inlong/manager/service/ServiceBaseTest.java    | 104 ++++++++-
 .../core/impl/InlongGroupProcessOperationTest.java |  60 ++---
 .../manager/service/sort/DisableZkForSortTest.java |   7 +-
 ...enerTest.java => StreamSourceListenerTest.java} |  96 ++++----
 .../service/workflow/WorkflowServiceImplTest.java  | 126 ++---------
 .../group/CreateGroupWorkflowDefinitionTest.java   |  11 +-
 .../main/resources/h2/apache_inlong_manager.sql    |   4 +-
 .../manager-web/sql/apache_inlong_manager.sql      |   4 +-
 64 files changed, 789 insertions(+), 1574 deletions(-)

diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
index a7d75ac56..20a12a072 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/Kafka2HiveTest.java
@@ -112,14 +112,14 @@ class Kafka2HiveTest extends BaseTest {
         initWorkflowResult.setProcessInfo(
                 ProcessResponse.builder()
                         .id(12)
-                        .name("NEW_GROUP_PROCESS")
-                        .displayName("New-Group")
-                        .type("New-Group")
+                        .name("APPLY_GROUP_PROCESS")
+                        .displayName("Apply-Group")
+                        .type("Apply-Group")
                         .applicant("admin")
                         .status(PROCESSING)
                         .startTime(new Date())
                         .formData(JsonUtils.parseTree(
-                                "{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\",\"id\":6,"
+                                "{\"formName\":\"ApplyGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\",\"id\":6,"
                                         + "\"inlongGroupId\":\"test_group009\",\"name\":null,\"description\":null,"
                                         + "\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,"
                                         + "\"enableCreateResource\":1,\"lightweight\":1,"
@@ -148,8 +148,8 @@ class Kafka2HiveTest extends BaseTest {
                                 .id(12)
                                 .type("UserTask")
                                 .processId(12)
-                                .processName("NEW_GROUP_PROCESS")
-                                .processDisplayName("New-Group")
+                                .processName("APPLY_GROUP_PROCESS")
+                                .processDisplayName("Apply-Group")
                                 .name("ut_admin")
                                 .displayName("SystemAdmin")
                                 .applicant("admin")
@@ -170,14 +170,14 @@ class Kafka2HiveTest extends BaseTest {
         startWorkflowResult.setProcessInfo(
                 ProcessResponse.builder()
                         .id(12)
-                        .name("NEW_GROUP_PROCESS")
-                        .displayName("New-Group")
-                        .type("New-Group")
+                        .name("APPLY_GROUP_PROCESS")
+                        .displayName("Apply-Group")
+                        .type("Apply-Group")
                         .applicant("admin")
                         .status(ProcessStatus.COMPLETED)
                         .startTime(new Date())
                         .endTime(new Date())
-                        .formData("{\"formName\":\"NewGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\","
+                        .formData("{\"formName\":\"ApplyGroupProcessForm\",\"groupInfo\":{\"mqType\":\"PULSAR\","
                                 + "\"id\":8,\"inlongGroupId\":\"test_group011\",\"name\":null,\"description\":null,"
                                 + "\"mqResource\":\"test_namespace\",\"enableZookeeper\":0,\"enableCreateResource\":1,"
                                 + "\"lightweight\":1,\"inlongClusterTag\":\"default_cluster\","
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
index 560b7dee8..7ec6f9076 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/impl/InlongGroupImpl.java
@@ -47,7 +47,7 @@ import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.TaskResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.springframework.boot.configurationprocessor.json.JSONObject;
@@ -116,7 +116,7 @@ public class InlongGroupImpl implements InlongGroup {
         Preconditions.checkTrue(ProcessStatus.PROCESSING == processView.getStatus(),
                 String.format("process status %s is not corrected, should be PROCESSING", processView.getStatus()));
 
-        // init must be NewGroupProcessForm
+        // init must be ApplyGroupProcessForm
         // compile with old cluster
         JSONObject formDataJson = JsonUtils.parseObject(
                 JsonUtils.toJsonString(JsonUtils.toJsonString(processView.getFormData())),
@@ -129,11 +129,12 @@ public class InlongGroupImpl implements InlongGroup {
             }
         }
         String formDataNew = formDataJson.toString();
-        NewGroupProcessForm newGroupProcessForm = JsonUtils.parseObject(
-                formDataNew, NewGroupProcessForm.class);
-        Preconditions.checkNotNull(newGroupProcessForm, "NewGroupProcessForm cannot be null");
-        groupContext.setInitMsg(newGroupProcessForm);
-        WorkflowResult startWorkflowResult = workFlowClient.startInlongGroup(taskId, newGroupProcessForm);
+        ApplyGroupProcessForm groupProcessForm = JsonUtils.parseObject(
+                formDataNew, ApplyGroupProcessForm.class);
+        Preconditions.checkNotNull(groupProcessForm, "ApplyGroupProcessForm cannot be null");
+        groupContext.setInitMsg(groupProcessForm);
+        assert groupProcessForm != null;
+        WorkflowResult startWorkflowResult = workFlowClient.startInlongGroup(taskId, groupProcessForm);
         processView = startWorkflowResult.getProcessInfo();
         Preconditions.checkTrue(ProcessStatus.COMPLETED == processView.getStatus(),
                 String.format("inlong group status %s is incorrect, should be COMPLETED", processView.getStatus()));
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
index c67260883..0abbd5e38 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/InnerGroupContext.java
@@ -23,7 +23,7 @@ import lombok.NoArgsConstructor;
 import org.apache.commons.collections.MapUtils;
 import org.apache.inlong.manager.client.api.InlongStream;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 
 import java.util.Map;
@@ -41,7 +41,7 @@ public class InnerGroupContext {
 
     private Map<String, InlongStream> streamMap = Maps.newHashMap();
 
-    private NewGroupProcessForm initMsg;
+    private ApplyGroupProcessForm initMsg;
 
     public String getGroupId() {
         Preconditions.checkNotNull(groupInfo, "inlong group info was not init");
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
index 82994129f..48e44ef0f 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/inner/client/WorkflowClient.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.client.api.util.ClientUtils;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.pojo.workflow.EventLogView;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.common.util.JsonUtils;
 
 import java.util.List;
@@ -48,18 +48,18 @@ public class WorkflowClient {
         workflowApi = ClientUtils.createRetrofit(configuration).create(WorkflowApi.class);
     }
 
-    public WorkflowResult startInlongGroup(int taskId, NewGroupProcessForm newGroupProcessForm) {
+    public WorkflowResult startInlongGroup(int taskId, ApplyGroupProcessForm groupProcessForm) {
         ObjectNode workflowTaskOperation = objectMapper.createObjectNode();
         workflowTaskOperation.putPOJO("transferTo", Lists.newArrayList());
         workflowTaskOperation.put("remark", "approved by system");
 
-        ObjectNode inlongGroupApproveForm = objectMapper.createObjectNode();
-        inlongGroupApproveForm.putPOJO("groupApproveInfo", newGroupProcessForm.getGroupInfo());
-        inlongGroupApproveForm.putPOJO("streamApproveInfoList", newGroupProcessForm.getStreamInfoList());
-        inlongGroupApproveForm.put("formName", "InlongGroupApproveForm");
-        workflowTaskOperation.set("form", inlongGroupApproveForm);
+        ObjectNode groupApproveForm = objectMapper.createObjectNode();
+        groupApproveForm.putPOJO("groupApproveInfo", groupProcessForm.getGroupInfo());
+        groupApproveForm.putPOJO("streamApproveInfoList", groupProcessForm.getStreamInfoList());
+        groupApproveForm.put("formName", "InlongGroupApproveForm");
+        workflowTaskOperation.set("form", groupApproveForm);
 
-        log.info("startInlongGroup workflowTaskOperation: {}", inlongGroupApproveForm);
+        log.info("startInlongGroup workflowTaskOperation: {}", groupApproveForm);
 
         Map<String, Object> requestMap = JsonUtils.OBJECT_MAPPER.convertValue(workflowTaskOperation,
                 new TypeReference<Map<String, Object>>() {
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
index 1cf7ca6c1..ebdf9d1f9 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/impl/InlongGroupImplTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.client.api.impl;
 
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.junit.jupiter.api.Test;
 
@@ -31,7 +31,7 @@ class InlongGroupImplTest {
     @Test
     void testParseForm() {
         String json = "{\n"
-                + "  \"formName\" : \"NewGroupProcessForm\",\n"
+                + "  \"formName\" : \"ApplyGroupProcessForm\",\n"
                 + "  \"groupInfo\" : {\n"
                 + "    \"mqType\" : \"PULSAR\",\n"
                 + "    \"id\" : 5,\n"
@@ -88,12 +88,12 @@ class InlongGroupImplTest {
                 + "  } ]\n"
                 + "}";
 
-        NewGroupProcessForm newGroupProcessForm =
-                JsonUtils.parseObject(json, NewGroupProcessForm.class);
+        ApplyGroupProcessForm applyGroupProcessForm =
+                JsonUtils.parseObject(json, ApplyGroupProcessForm.class);
 
-        assertNotNull(newGroupProcessForm);
-        assertNotNull(newGroupProcessForm.getGroupInfo());
-        assertNotNull(newGroupProcessForm.getStreamInfoList());
+        assertNotNull(applyGroupProcessForm);
+        assertNotNull(applyGroupProcessForm.getGroupInfo());
+        assertNotNull(applyGroupProcessForm.getStreamInfoList());
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index b4ae91164..2d9b4a2a1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -34,8 +34,8 @@ public enum ErrorCodeEnum {
     GROUP_NOT_FOUND(1001, "Inlong group does not exist/no operation authority"),
     GROUP_DUPLICATE(1002, "Inlong group already exists"),
     GROUP_INFO_INCORRECT(1003, "Group info was incorrect"),
-    GROUP_SAVE_FAILED(1003, "Failed to save/update inlong group information"),
-    GROUP_PERMISSION_DENIED(1004, "No access to this inlong group"),
+    GROUP_SAVE_FAILED(1003, "Failed to save/update inlong group"),
+    GROUP_PERMISSION_DENIED(1004, "No permission to access this inlong group"),
     GROUP_HAS_STREAM(1005, "There are some valid inlong stream for this inlong group"),
     GROUP_UPDATE_NOT_ALLOWED(1006, "The current inlong group status does not support modification"),
     GROUP_DELETE_NOT_ALLOWED(1007, "The current inlong group status does not support deletion"),
@@ -46,8 +46,7 @@ public enum ErrorCodeEnum {
     GROUP_INFO_INCONSISTENT(1013, "The inlong group info is inconsistent, please contact the administrator"),
     GROUP_MODE_UNSUPPORTED(1014, "The current inlong group mode only support light, normal"),
 
-    OPT_NOT_ALLOWED_BY_STATUS(1021,
-            "The current inlong group status does not allow adding/modifying/deleting related info"),
+    OPT_NOT_ALLOWED_BY_STATUS(1021, "InlongGroup status %s was not allowed to add/update/delete related info"),
 
     MQ_TYPE_NOT_SUPPORTED(1022, "MQ type '%s' not supported"),
     MQ_TYPE_NOT_SAME(1023, "Expected MQ type is '%s', but found '%s'"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
index 43e7606d4..4105def53 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupMode.java
@@ -27,11 +27,12 @@ import java.util.Objects;
  * Mode of inlong group
  */
 public enum GroupMode {
+
     /**
-     * Normal group init with all components in Inlong Cluster
-     * StreamSource -> Agent/SDK -> DataProxy -> Cache -> Sort -> StreamSink
+     * Standard group init with all components in Inlong Cluster
+     * StreamSource -> Agent/SDK -> DataProxy -> MQ Cache -> Sort -> StreamSink
      */
-    NORMAL("normal"),
+    STANDARD("standard"),
 
     /**
      * Lightweight group init with sort in Inlong Cluster
@@ -59,6 +60,6 @@ public enum GroupMode {
         if (Objects.equals(groupInfo.getLightweight(), InlongConstants.LIGHTWEIGHT_MODE)) {
             return GroupMode.LIGHTWEIGHT;
         }
-        return GroupMode.NORMAL;
+        return GroupMode.STANDARD;
     }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewConsumptionProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
similarity index 93%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewConsumptionProcessForm.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
index 6d3eb61d7..2587baa63 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewConsumptionProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
@@ -32,9 +32,9 @@ import java.util.Map;
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
-public class NewConsumptionProcessForm extends BaseProcessForm {
+public class ApplyConsumptionProcessForm extends BaseProcessForm {
 
-    public static final String FORM_NAME = "NewConsumptionProcessForm";
+    public static final String FORM_NAME = "ApplyConsumptionProcessForm";
 
     @ApiModelProperty(value = "Data consumption information")
     private ConsumptionInfo consumptionInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewGroupProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyGroupProcessForm.java
similarity index 93%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewGroupProcessForm.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyGroupProcessForm.java
index 0edf76855..17a1645be 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/NewGroupProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/ApplyGroupProcessForm.java
@@ -30,13 +30,13 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * New inlong group process form
+ * Apply inlong group process form
  */
 @Data
 @EqualsAndHashCode(callSuper = false)
-public class NewGroupProcessForm extends BaseProcessForm {
+public class ApplyGroupProcessForm extends BaseProcessForm {
 
-    public static final String FORM_NAME = "NewGroupProcessForm";
+    public static final String FORM_NAME = "ApplyGroupProcessForm";
 
     @ApiModelProperty(value = "Inlong group info", required = true)
     private InlongGroupInfo groupInfo;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java
index 48327122f..175b69c36 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/BaseProcessForm.java
@@ -28,10 +28,9 @@ import lombok.Data;
 @Data
 @JsonTypeInfo(use = Id.NAME, property = "formName")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = NewGroupProcessForm.class, name = NewGroupProcessForm.FORM_NAME),
-        @JsonSubTypes.Type(value = NewConsumptionProcessForm.class, name = NewConsumptionProcessForm.FORM_NAME),
+        @JsonSubTypes.Type(value = ApplyGroupProcessForm.class, name = ApplyGroupProcessForm.FORM_NAME),
+        @JsonSubTypes.Type(value = ApplyConsumptionProcessForm.class, name = ApplyConsumptionProcessForm.FORM_NAME),
         @JsonSubTypes.Type(value = GroupResourceProcessForm.class, name = GroupResourceProcessForm.FORM_NAME),
-        @JsonSubTypes.Type(value = LightGroupResourceProcessForm.class, name = LightGroupResourceProcessForm.FORM_NAME),
         @JsonSubTypes.Type(value = StreamResourceProcessForm.class, name = StreamResourceProcessForm.FORM_NAME),
 })
 public abstract class BaseProcessForm implements ProcessForm {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
index 634acb078..0aef09b27 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/workflow/form/process/LightGroupResourceProcessForm.java
@@ -26,7 +26,9 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.util.Preconditions;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Form of create lightweight inlong group resource
@@ -57,4 +59,13 @@ public class LightGroupResourceProcessForm extends BaseProcessForm {
     public String getInlongGroupId() {
         return groupInfo.getInlongGroupId();
     }
+
+    @Override
+    public Map<String, Object> showInList() {
+        Map<String, Object> show = new HashMap<>();
+        show.put("inlongGroupId", groupInfo.getInlongGroupId());
+        show.put("groupOperateType", this.groupOperateType);
+        return show;
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
index b1ef3881d..695618bfb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/ConsumptionProcessOperation.java
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
@@ -52,15 +52,15 @@ public class ConsumptionProcessOperation {
                 "current status not allow start workflow");
 
         consumptionInfo.setStatus(ConsumptionStatus.WAIT_APPROVE.getStatus());
-        boolean isSuccess = consumptionService.update(consumptionInfo,operator);
+        boolean isSuccess = consumptionService.update(consumptionInfo, operator);
         Preconditions.checkTrue(isSuccess, "update consumption failed");
 
-        return workflowService.start(ProcessName.NEW_CONSUMPTION_PROCESS, operator,
-                genNewConsumptionProcessForm(consumptionInfo));
+        return workflowService.start(ProcessName.APPLY_CONSUMPTION_PROCESS, operator,
+                genConsumptionProcessForm(consumptionInfo));
     }
 
-    private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
-        NewConsumptionProcessForm form = new NewConsumptionProcessForm();
+    private ApplyConsumptionProcessForm genConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
+        ApplyConsumptionProcessForm form = new ApplyConsumptionProcessForm();
         Integer id = consumptionInfo.getId();
         MQType mqType = MQType.forType(consumptionInfo.getMqType());
         if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
index dd6ac6724..5f8146531 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/operation/InlongGroupProcessOperation.java
@@ -19,19 +19,16 @@ package org.apache.inlong.manager.service.core.operation;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupMode;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupResetRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessQuery;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.WorkflowProcessEntity;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -44,7 +41,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -79,198 +76,185 @@ public class InlongGroupProcessOperation {
     private InlongStreamService streamService;
 
     /**
-     * Allocate resource application groups for access services and initiate an approval process
+     * Start a New InlongGroup for the specified inlong group id.
      *
-     * @param groupId Inlong group id
-     * @param operator Operator name
-     * @return Workflow result
+     * @param groupId inlong group id
+     * @param operator name of operator
+     * @return workflow result
      */
     public WorkflowResult startProcess(String groupId, String operator) {
-        LOGGER.info("begin to start approve process, groupId = {}, operator = {}", groupId, operator);
+        LOGGER.info("begin to start approve process for groupId={} by operator={}", groupId, operator);
+
         groupService.updateStatus(groupId, GroupStatus.TO_BE_APPROVAL.getCode(), operator);
-        // Initiate the approval process
-        NewGroupProcessForm form = genNewGroupProcessForm(groupId);
-        return workflowService.start(ProcessName.NEW_GROUP_PROCESS, operator, form);
+        ApplyGroupProcessForm form = genApplyGroupProcessForm(groupId);
+        WorkflowResult result = workflowService.start(ProcessName.APPLY_GROUP_PROCESS, operator, form);
+
+        LOGGER.info("success to start approve process for groupId={} by operator={}", groupId, operator);
+        return result;
     }
 
     /**
-     * Suspend resource application group in an asynchronous way,
-     * stop source and sort task related to application group asynchronously,
-     * persist the application status if necessary.
+     * Suspend InlongGroup in an asynchronous way.
      *
-     * @return groupId
+     * @param groupId inlong group id
+     * @param operator name of operator
+     * @return inlong group id
+     * @apiNote Stop source and sort task related to the inlong group asynchronously, persist the status if
+     *         necessary.
      */
     public String suspendProcessAsync(String groupId, String operator) {
-        LOGGER.info("begin to suspend process asynchronously, groupId = {}, operator = {}", groupId, operator);
+        LOGGER.info("begin to suspend process asynchronously for groupId={} by operator={}", groupId, operator);
+
         groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        GroupMode mode = GroupMode.parseGroupMode(groupInfo);
-        switch (mode) {
-            case NORMAL:
-                GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
-                executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
-                break;
-            case LIGHTWEIGHT:
-                LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
-                executorService.execute(
-                        () -> workflowService.start(ProcessName.SUSPEND_LIGHT_GROUP_PROCESS, operator, lightForm));
-                break;
-            default:
-                throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
-        }
+        GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND);
+        executorService.execute(() -> workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form));
+
+        LOGGER.info("success to suspend process asynchronously for groupId={} by operator={}", groupId, operator);
         return groupId;
     }
 
     /**
-     * Suspend resource application group which is started up successfully,
-     * stop source and sort task related to application group asynchronously,
-     * persist the application status if necessary.
+     * Suspend InlongGroup which is started up successfully.
      *
-     * @return Workflow result
+     * @param groupId inlong group id
+     * @param operator name of operator
+     * @return workflow result
+     * @apiNote Stop source and sort task related to the inlong group asynchronously, persist the status if
+     *         necessary.
      */
     public WorkflowResult suspendProcess(String groupId, String operator) {
-        LOGGER.info("begin to suspend process, groupId = {}, operator = {}", groupId, operator);
+        LOGGER.info("begin to suspend process for groupId={} by operator={}", groupId, operator);
+
         groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        GroupMode mode = GroupMode.parseGroupMode(groupInfo);
-        WorkflowResult result;
-        switch (mode) {
-            case NORMAL:
-                GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
-                result = workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form);
-                break;
-            case LIGHTWEIGHT:
-                LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.SUSPEND);
-                result = workflowService.start(ProcessName.SUSPEND_LIGHT_GROUP_PROCESS, operator, lightForm);
-                break;
-            default:
-                throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
-        }
+        GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.SUSPEND);
+        WorkflowResult result = workflowService.start(ProcessName.SUSPEND_GROUP_PROCESS, operator, form);
+
+        LOGGER.info("success to suspend process for groupId={} by operator={}", groupId, operator);
         return result;
     }
 
     /**
-     * Restart resource application group in an asynchronous way,
-     * starting from the last persist snapshot.
+     * Restart InlongGroup in an asynchronous way, starting from the last persist snapshot.
      *
-     * @return Workflow result
+     * @param groupId inlong group id
+     * @param operator name of operator
+     * @return workflow result
      */
     public String restartProcessAsync(String groupId, String operator) {
-        LOGGER.info("begin to restart process asynchronously, groupId = {}, operator = {}", groupId, operator);
+        LOGGER.info("begin to restart process asynchronously for groupId={} by operator={}", groupId, operator);
+
         groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        GroupMode mode = GroupMode.parseGroupMode(groupInfo);
-        switch (mode) {
-            case NORMAL:
-                GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.RESTART);
-                executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
-                break;
-            case LIGHTWEIGHT:
-                LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.RESTART);
-                executorService.execute(
-                        () -> workflowService.start(ProcessName.RESTART_LIGHT_GROUP_PROCESS, operator, lightForm));
-                break;
-            default:
-                throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
-        }
+        GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART);
+        executorService.execute(() -> workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form));
+
+        LOGGER.info("success to restart process asynchronously for groupId={} by operator={}", groupId, operator);
         return groupId;
     }
 
     /**
-     * Restart resource application group which is suspended successfully,
-     * starting from the last persist snapshot.
+     * Restart InlongGroup which is started up successfully, starting from the last persist snapshot.
      *
-     * @return Workflow result
+     * @param groupId inlong group id
+     * @param operator name of operator
+     * @return workflow result
      */
     public WorkflowResult restartProcess(String groupId, String operator) {
-        LOGGER.info("begin to restart process, groupId = {}, operator = {}", groupId, operator);
+        LOGGER.info("begin to restart process for groupId={} by operator={}", groupId, operator);
+
         groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        GroupMode mode = GroupMode.parseGroupMode(groupInfo);
-        WorkflowResult result;
-        switch (mode) {
-            case NORMAL:
-                GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.RESTART);
-                result = workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form);
-                break;
-            case LIGHTWEIGHT:
-                LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo, GroupOperateType.RESTART);
-                result = workflowService.start(ProcessName.RESTART_LIGHT_GROUP_PROCESS, operator, lightForm);
-                break;
-            default:
-                throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
-        }
+        GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.RESTART);
+        WorkflowResult result = workflowService.start(ProcessName.RESTART_GROUP_PROCESS, operator, form);
+
+        LOGGER.info("success to restart process for groupId={} by operator={}", groupId, operator);
         return result;
     }
 
     /**
-     * Delete resource application group logically and delete related resource in an
+     * Delete InlongGroup logically and delete related resource in an asynchronous way.
+     *
+     * @param groupId inlong group id
+     * @param operator name of operator
+     * @return inlong group id
      */
     public String deleteProcessAsync(String groupId, String operator) {
-        LOGGER.info("begin to delete process asynchronously, groupId = {}, operator = {}", groupId, operator);
+        LOGGER.info("begin to delete process asynchronously for groupId={} by operator={}", groupId, operator);
         executorService.execute(() -> {
             try {
                 invokeDeleteProcess(groupId, operator);
             } catch (Exception ex) {
-                LOGGER.error("exception while delete process, groupId = {}, operator = {}", groupId, operator, ex);
+                LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
                 throw ex;
             }
             groupService.delete(groupId, operator);
         });
+
+        LOGGER.info("success to delete process asynchronously for groupId={} by operator={}", groupId, operator);
         return groupId;
     }
 
     /**
-     * Delete resource application group logically and delete related resource in an asynchronous way
+     * Delete InlongGroup logically and delete related resource in an asynchronous way.
      */
     public boolean deleteProcess(String groupId, String operator) {
-        LOGGER.info("begin to delete process, groupId = {}, operator = {}", groupId, operator);
+        LOGGER.info("begin to delete process for groupId={} by operator={}", groupId, operator);
         try {
             invokeDeleteProcess(groupId, operator);
         } catch (Exception ex) {
-            LOGGER.error("exception while delete process, groupId = {}, operator = {}", groupId, operator, ex);
+            LOGGER.error("exception while delete process for groupId={} by operator={}", groupId, operator, ex);
             throw ex;
         }
-        return groupService.delete(groupId, operator);
+
+        boolean result = groupService.delete(groupId, operator);
+        LOGGER.info("success to delete process for groupId={} by operator={}", groupId, operator);
+        return result;
     }
 
     /**
-     * Reset group status when group is staying CONFIG_ING|SUSPENDING|RESTARTING|DELETING for a long time
+     * Reset InlongGroup status when group is staying CONFIG_ING|SUSPENDING|RESTARTING|DELETING for a long time.
      * This api is side effect, must be used carefully.
      *
-     * @param request
-     * @param operator
-     * @return
+     * @param request reset inlong group request
+     * @param operator name of operator
+     * @return success or false
      */
     public boolean resetGroupStatus(InlongGroupResetRequest request, String operator) {
-        LOGGER.info("begin to reset group status, request = {}, operator = {}", request, operator);
+        LOGGER.info("begin to reset group status by operator={} for request={}", operator, request);
         final String groupId = request.getInlongGroupId();
         InlongGroupInfo groupInfo = groupService.get(groupId);
         Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+
         GroupStatus status = GroupStatus.forCode(groupInfo.getStatus());
-        final int rerunProcess = request.getRerunProcess();
-        final int resetFinalStatus = request.getResetFinalStatus();
+        boolean result;
         switch (status) {
             case CONFIG_ING:
             case SUSPENDING:
             case RESTARTING:
             case DELETING:
-                return dealWithPendingGroup(groupInfo, operator, status, rerunProcess, resetFinalStatus);
+                final int rerunProcess = request.getRerunProcess();
+                final int resetFinalStatus = request.getResetFinalStatus();
+                result = pendingGroupOpt(groupInfo, operator, status, rerunProcess, resetFinalStatus);
+                break;
             default:
-                throw new IllegalStateException(
-                        String.format("Unsupported status to reset for group = %s and status = %s",
-                                request.getInlongGroupId(), status));
+                throw new IllegalStateException(String.format("Unsupported status to reset groupId=%s and status=%s",
+                        request.getInlongGroupId(), status));
         }
+
+        LOGGER.info("finish to reset group status by operator={}, result={} for request={}", operator, result, request);
+        return result;
     }
 
-    private boolean dealWithPendingGroup(InlongGroupInfo groupInfo, String operator, GroupStatus status,
+    private boolean pendingGroupOpt(InlongGroupInfo groupInfo, String operator, GroupStatus status,
             int rerunProcess, int resetFinalStatus) {
         final String groupId = groupInfo.getInlongGroupId();
         if (rerunProcess == 1) {
             ProcessQuery processQuery = new ProcessQuery();
             processQuery.setInlongGroupId(groupId);
             List<WorkflowProcessEntity> entities = workflowQueryService.listProcessEntity(processQuery);
-            Collections.sort(entities, (process1, process2) -> process1.getId() - process2.getId());
+            entities.sort(Comparator.comparingInt(WorkflowProcessEntity::getId));
             WorkflowProcessEntity lastProcess = entities.get(entities.size() - 1);
             executorService.execute(() -> {
                 workflowService.continueProcess(lastProcess.getId(), operator, "Reset group status");
@@ -300,27 +284,15 @@ public class InlongGroupProcessOperation {
 
     private void invokeDeleteProcess(String groupId, String operator) {
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        GroupMode mode = GroupMode.parseGroupMode(groupInfo);
-        switch (mode) {
-            case NORMAL:
-                GroupResourceProcessForm form = genGroupProcessForm(groupInfo, GroupOperateType.DELETE);
-                workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
-                break;
-            case LIGHTWEIGHT:
-                LightGroupResourceProcessForm lightForm = genLightGroupProcessForm(groupInfo,
-                        GroupOperateType.DELETE);
-                workflowService.start(ProcessName.DELETE_LIGHT_GROUP_PROCESS, operator, lightForm);
-                break;
-            default:
-                throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
-        }
+        GroupResourceProcessForm form = genGroupResourceProcessForm(groupInfo, GroupOperateType.DELETE);
+        workflowService.start(ProcessName.DELETE_GROUP_PROCESS, operator, form);
     }
 
     /**
-     * Generate the form of [New Group Workflow]
+     * Generate the form of [Apply Group Workflow]
      */
-    private NewGroupProcessForm genNewGroupProcessForm(String groupId) {
-        NewGroupProcessForm form = new NewGroupProcessForm();
+    private ApplyGroupProcessForm genApplyGroupProcessForm(String groupId) {
+        ApplyGroupProcessForm form = new ApplyGroupProcessForm();
         InlongGroupInfo groupInfo = groupService.get(groupId);
         form.setGroupInfo(groupInfo);
         List<InlongStreamBriefInfo> infoList = streamService.getBriefList(groupInfo.getInlongGroupId());
@@ -328,25 +300,16 @@ public class InlongGroupProcessOperation {
         return form;
     }
 
-    private GroupResourceProcessForm genGroupProcessForm(InlongGroupInfo groupInfo, GroupOperateType operateType) {
-        GroupResourceProcessForm form = new GroupResourceProcessForm();
-        String groupId = groupInfo.getInlongGroupId();
-        if (GroupOperateType.RESTART == operateType) {
-            List<InlongStreamInfo> streamList = streamService.list(groupId);
-            form.setStreamInfos(streamList);
-        }
-        form.setGroupInfo(groupInfo);
-        form.setGroupOperateType(operateType);
-        return form;
-    }
-
-    private LightGroupResourceProcessForm genLightGroupProcessForm(InlongGroupInfo groupInfo,
+    /**
+     * Generate the form of [Group Resource Workflow]
+     */
+    private GroupResourceProcessForm genGroupResourceProcessForm(InlongGroupInfo groupInfo,
             GroupOperateType operateType) {
-        LightGroupResourceProcessForm form = new LightGroupResourceProcessForm();
-        form.setGroupInfo(groupInfo);
+        GroupResourceProcessForm form = new GroupResourceProcessForm();
         String groupId = groupInfo.getInlongGroupId();
         List<InlongStreamInfo> streamList = streamService.list(groupId);
         form.setStreamInfos(streamList);
+        form.setGroupInfo(groupInfo);
         form.setGroupOperateType(operateType);
         return form;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
index c3062c047..f87f9ecc2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
@@ -23,8 +23,6 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -37,30 +35,28 @@ import java.util.List;
 @Service
 public class GroupCheckService {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(GroupCheckService.class);
-
     @Autowired
     private InlongGroupEntityMapper groupMapper;
 
     /**
      * Check whether the inlong group status is temporary
      *
-     * @param groupId Inlong group id
-     * @return Inlong group entity, for caller reuse
+     * @param groupId inlong group id
+     * @return inlong group entity, for caller reuse
      */
     public InlongGroupEntity checkGroupStatus(String groupId, String operator) {
         InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
-        Preconditions.checkNotNull(inlongGroupEntity, "groupId is invalid");
+        if (inlongGroupEntity == null) {
+            throw new BusinessException(String.format("InlongGroup does not exist with InlongGroupId=%s", groupId));
+        }
 
         List<String> managers = Arrays.asList(inlongGroupEntity.getInCharges().split(","));
         Preconditions.checkTrue(managers.contains(operator),
                 String.format(ErrorCodeEnum.USER_IS_NOT_MANAGER.getMessage(), operator, managers));
 
-        GroupStatus state = GroupStatus.forCode(inlongGroupEntity.getStatus());
-        // Add/modify/delete is not allowed under certain group state
-        if (GroupStatus.notAllowedUpdate(state)) {
-            LOGGER.error("inlong group status was not allowed to add/update/delete related info");
-            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS);
+        GroupStatus status = GroupStatus.forCode(inlongGroupEntity.getStatus());
+        if (GroupStatus.notAllowedUpdate(status)) {
+            throw new BusinessException(String.format(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS.getMessage(), status));
         }
 
         return inlongGroupEntity;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
index aacfbba1c..176c6ae0c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
@@ -23,72 +23,72 @@ package org.apache.inlong.manager.service.workflow;
 public enum ProcessName {
 
     /**
-     * New inlong group application process
+     * Apply inlong group process
      */
-    NEW_GROUP_PROCESS("New-Group"),
+    APPLY_GROUP_PROCESS("Apply-Group"),
 
     /**
-     * Startup inlong group application process
+     * Create inlong group resources process
      */
     CREATE_GROUP_RESOURCE("Create-Group"),
 
     /**
-     * Suspend inlong group application process
+     * Suspend inlong group process
      */
     SUSPEND_GROUP_PROCESS("Suspend-Group"),
 
     /**
-     * Restart inlong group application process
+     * Restart inlong group process
      */
     RESTART_GROUP_PROCESS("Restart-Group"),
 
     /**
-     * Delete inlong group application process
+     * Delete inlong group process
      */
     DELETE_GROUP_PROCESS("Delete-Group"),
 
     /**
-     * Startup lightweight inlong group application process
+     * Startup lightweight inlong group process
      */
     CREATE_LIGHT_GROUP_PROCESS("Create-Light-Group"),
 
     /**
-     * Suspend lightweight inlong group application process
+     * Suspend lightweight inlong group process
      */
     SUSPEND_LIGHT_GROUP_PROCESS("Suspend-Light-Group"),
 
     /**
-     * Restart lightweight inlong group application process
+     * Restart lightweight inlong group process
      */
     RESTART_LIGHT_GROUP_PROCESS("Restart-Light-Group"),
 
     /**
-     * Delete lightweight inlong group application process
+     * Delete lightweight inlong group process
      */
     DELETE_LIGHT_GROUP_PROCESS("Delete-Light-Group"),
 
     /**
-     * New consumption application process
+     * Apply consumption process
      */
-    NEW_CONSUMPTION_PROCESS("New-Consumption"),
+    APPLY_CONSUMPTION_PROCESS("Apply-Consumption"),
 
     /**
-     * Startup single stream process
+     * Create inlong stream process
      */
     CREATE_STREAM_RESOURCE("Create-Stream"),
 
     /**
-     * Suspend single stream process
+     * Suspend inlong stream process
      */
     SUSPEND_STREAM_RESOURCE("Suspend-Stream"),
 
     /**
-     * Restart single stream process
+     * Restart inlong stream process
      */
     RESTART_STREAM_RESOURCE("Restart-Stream"),
 
     /**
-     * Delete single stream process
+     * Delete inlong stream process
      */
     DELETE_STREAM_RESOURCE("Delete-Stream");
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
index 6e60e5a55..8e1f77046 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.workflow.consumption;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessDetailResponse;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
 import org.apache.inlong.manager.workflow.definition.ProcessDetailHandler;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
@@ -28,10 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * New consumption process detail handler
+ * Apply consumption process handler
  */
 @Component
-public class NewConsumptionProcessDetailHandler implements ProcessDetailHandler {
+public class ApplyConsumptionProcessHandler implements ProcessDetailHandler {
 
     @Autowired
     private ObjectMapper objectMapper;
@@ -41,7 +41,7 @@ public class NewConsumptionProcessDetailHandler implements ProcessDetailHandler
     @Override
     public ProcessDetailResponse handle(ProcessDetailResponse processResponse) {
         WorkflowProcess process = processDefinitionService.getByName(processResponse.getWorkflow().getName());
-        NewConsumptionProcessForm processForm = WorkflowFormParserUtils
+        ApplyConsumptionProcessForm processForm = WorkflowFormParserUtils
                 .parseProcessForm(objectMapper, processResponse.getProcessInfo().getFormData().toString(), process);
         if (processForm == null) {
             return processResponse;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
similarity index 88%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
index 94cd312ef..9e64f2886 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
@@ -17,15 +17,12 @@
 
 package org.apache.inlong.manager.service.workflow.consumption;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCancelProcessListener;
@@ -40,11 +37,15 @@ import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 /**
  * New data consumption workflow definition
  */
 @Component
-public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
+public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
 
     public static final String UT_ADMINT_NAME = "ut_admin";
     public static final String UT_GROUP_OWNER_NAME = "ut_biz_owner";
@@ -65,7 +66,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     private WorkflowApproverService workflowApproverService;
 
     @Autowired
-    private NewConsumptionProcessDetailHandler newConsumptionProcessDetailHandler;
+    private ApplyConsumptionProcessHandler applyConsumptionProcessHandler;
 
     @Autowired
     private InlongGroupService groupService;
@@ -74,12 +75,12 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     public WorkflowProcess defineProcess() {
         // Define process information
         WorkflowProcess process = new WorkflowProcess();
-        process.setType("Data Consumption Resource Creation");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(NewConsumptionProcessForm.class);
+        process.setFormClass(ApplyConsumptionProcessForm.class);
         process.setVersion(1);
-        process.setProcessDetailHandler(newConsumptionProcessDetailHandler);
+        process.setProcessDetailHandler(applyConsumptionProcessHandler);
 
         // Start node
         StartEvent startEvent = new StartEvent();
@@ -93,7 +94,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
         UserTask groupOwnerUserTask = new UserTask();
         groupOwnerUserTask.setName(UT_GROUP_OWNER_NAME);
         groupOwnerUserTask.setDisplayName("Group Approval");
-        groupOwnerUserTask.setApproverAssign(this::bizOwnerUserTaskApprover);
+        groupOwnerUserTask.setApproverAssign(this::groupOwnerUserTaskApprover);
         process.addTask(groupOwnerUserTask);
 
         // System administrator approval
@@ -123,8 +124,8 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
                 new WorkflowApproverFilterContext());
     }
 
-    private List<String> bizOwnerUserTaskApprover(WorkflowContext context) {
-        NewConsumptionProcessForm form = (NewConsumptionProcessForm) context.getProcessForm();
+    private List<String> groupOwnerUserTaskApprover(WorkflowContext context) {
+        ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
         InlongGroupInfo groupInfo = groupService.get(form.getConsumptionInfo().getInlongGroupId());
         if (groupInfo == null || groupInfo.getInCharges() == null) {
             return Collections.emptyList();
@@ -135,7 +136,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
 
     @Override
     public ProcessName getProcessName() {
-        return ProcessName.NEW_CONSUMPTION_PROCESS;
+        return ProcessName.APPLY_CONSUMPTION_PROCESS;
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
index d678968a4..c01964e98 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
@@ -20,9 +20,9 @@ package org.apache.inlong.manager.service.workflow.consumption.listener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -53,7 +53,7 @@ public class ConsumptionCancelProcessListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewConsumptionProcessForm processForm = (NewConsumptionProcessForm) context.getProcessForm();
+        ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
 
         ConsumptionEntity update = new ConsumptionEntity();
         update.setId(processForm.getConsumptionInfo().getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 43eb82ac1..6ad4ea7c5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -75,7 +75,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewConsumptionProcessForm consumptionForm = (NewConsumptionProcessForm) context.getProcessForm();
+        ApplyConsumptionProcessForm consumptionForm = (ApplyConsumptionProcessForm) context.getProcessForm();
 
         // Real-time query of consumption information
         Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
index d2c27421f..c50bafd26 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.task.ConsumptionApproveForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -50,7 +50,7 @@ public class ConsumptionPassTaskListener implements TaskEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewConsumptionProcessForm form = (NewConsumptionProcessForm) context.getProcessForm();
+        ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
         ConsumptionApproveForm approveForm = (ConsumptionApproveForm) context.getActionContext().getForm();
         ConsumptionInfo info = form.getConsumptionInfo();
         if (StringUtils.equals(approveForm.getConsumerGroup(), info.getConsumerGroup())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
index a9f9a16db..09ba29cc9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
@@ -21,7 +21,7 @@ import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -51,7 +51,7 @@ public class ConsumptionRejectProcessListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewConsumptionProcessForm processForm = (NewConsumptionProcessForm) context.getProcessForm();
+        ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
 
         ConsumptionEntity update = new ConsumptionEntity();
         update.setId(processForm.getConsumptionInfo().getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
similarity index 81%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
index 97b77d943..fc5990d04 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/NewGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/ApplyGroupWorkflowDefinition.java
@@ -18,15 +18,15 @@
 package org.apache.inlong.manager.service.workflow.group;
 
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.task.InlongGroupApproveForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupCancelProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.approve.GroupAfterApprovedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.approve.GroupRejectProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.approve.GroupApproveProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.AfterApprovedTaskListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.ApproveApplyProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.CancelApplyProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.apply.RejectApplyProcessListener;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.StartEvent;
 import org.apache.inlong.manager.workflow.definition.UserTask;
@@ -40,16 +40,16 @@ import java.util.List;
  * New inlong group process definition
  */
 @Component
-public class NewGroupWorkflowDefinition implements WorkflowDefinition {
+public class ApplyGroupWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private GroupAfterApprovedListener groupAfterApprovedListener;
+    private CancelApplyProcessListener cancelApplyProcessListener;
     @Autowired
-    private GroupCancelProcessListener groupCancelProcessListener;
+    private RejectApplyProcessListener rejectApplyProcessListener;
     @Autowired
-    private GroupRejectProcessListener approveRejectProcessListener;
+    private ApproveApplyProcessListener approveApplyProcessListener;
     @Autowired
-    private GroupApproveProcessListener groupApproveProcessListener;
+    private AfterApprovedTaskListener afterApprovedTaskListener;
     @Autowired
     private WorkflowApproverService workflowApproverService;
 
@@ -57,18 +57,18 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
     public WorkflowProcess defineProcess() {
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.setType(getProcessName().getDisplayName());
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(NewGroupProcessForm.class);
+        process.setFormClass(ApplyGroupProcessForm.class);
         process.setVersion(1);
 
         // Set up the listener
-        process.addListener(groupCancelProcessListener);
-        process.addListener(approveRejectProcessListener);
+        process.addListener(cancelApplyProcessListener);
+        process.addListener(rejectApplyProcessListener);
         // Initiate the process of creating inlong group resources, and set the inlong group status
         // to [Configuration Successful]/[Configuration Failed] according to its completion
-        process.addListener(groupApproveProcessListener);
+        process.addListener(approveApplyProcessListener);
 
         // Start node
         StartEvent startEvent = new StartEvent();
@@ -84,12 +84,13 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
         adminUserTask.setDisplayName("SystemAdmin");
         adminUserTask.setFormClass(InlongGroupApproveForm.class);
         adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
-        adminUserTask.addListener(groupAfterApprovedListener);
+        adminUserTask.addListener(afterApprovedTaskListener);
         process.addTask(adminUserTask);
 
-        // Configuration order relation
-        startEvent.addNext(adminUserTask);
         // If you need another approval process, you can add it here
+
+        // Configuration the tasks order
+        startEvent.addNext(adminUserTask);
         adminUserTask.addNext(endEvent);
 
         return process;
@@ -97,7 +98,7 @@ public class NewGroupWorkflowDefinition implements WorkflowDefinition {
 
     @Override
     public ProcessName getProcessName() {
-        return ProcessName.NEW_GROUP_PROCESS;
+        return ProcessName.APPLY_GROUP_PROCESS;
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
index 94182b69a..862c1f99e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinition.java
@@ -21,9 +21,9 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupFailedProcessListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupInitProcessListener;
+import org.apache.inlong.manager.service.workflow.group.listener.InitGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.InitGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.InitGroupListener;
 import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
@@ -41,61 +41,61 @@ import org.springframework.stereotype.Component;
 public class CreateGroupWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private GroupInitProcessListener groupInitProcessListener;
+    private InitGroupListener initGroupListener;
     @Autowired
-    private GroupCompleteProcessListener groupCompleteProcessListener;
+    private InitGroupCompleteListener initGroupCompleteListener;
     @Autowired
-    private GroupFailedProcessListener groupFailedProcessListener;
+    private InitGroupFailedListener initGroupFailedListener;
     @Autowired
     private GroupTaskListenerFactory groupTaskListenerFactory;
 
     @Override
     public WorkflowProcess defineProcess() {
-
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(groupInitProcessListener);
-        process.addListener(groupCompleteProcessListener);
-        process.addListener(groupFailedProcessListener);
-
-        process.setType("Group Resource Creation");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(GroupResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(initGroupListener);
+        process.addListener(initGroupCompleteListener);
+        process.addListener(initGroupFailedListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        // init MQ
+        // Init MQ
         ServiceTask initMQTask = new ServiceTask();
-        initMQTask.setName("initMQ");
+        initMQTask.setName("InitMQ");
         initMQTask.setDisplayName("Group-InitMQ");
         initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
         initMQTask.addListenerProvider(groupTaskListenerFactory);
         process.addTask(initMQTask);
 
-        // init Sink
+        // Init Sink
         ServiceTask initSinkTask = new ServiceTask();
-        initSinkTask.setName("initSink");
+        initSinkTask.setName("InitSink");
         initSinkTask.setDisplayName("Group-InitSink");
         initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
         initSinkTask.addListenerProvider(groupTaskListenerFactory);
         process.addTask(initSinkTask);
 
-        // init Sort
+        // Init Sort
         ServiceTask initSortTask = new ServiceTask();
-        initSortTask.setName("initSort");
+        initSortTask.setName("InitSort");
         initSortTask.setDisplayName("Group-InitSort");
         initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
         initSortTask.addListenerProvider(groupTaskListenerFactory);
         process.addTask(initSortTask);
 
-        // init Source
+        // Init Source
         ServiceTask initSourceTask = new ServiceTask();
-        initSourceTask.setName("initSource");
+        initSourceTask.setName("InitSource");
         initSourceTask.setDisplayName("Group-InitSource");
         initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
         initSourceTask.addListenerProvider(groupTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
index 3d0f735df..8f813f6f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/DeleteGroupWorkflowDefinition.java
@@ -19,12 +19,12 @@ package org.apache.inlong.manager.service.workflow.group;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupListener;
+import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -34,18 +34,18 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Delete workflow definitions for inlong group
+ * Delete workflow definition for inlong group
  */
 @Slf4j
 @Component
 public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private GroupUpdateListener groupUpdateListener;
+    private UpdateGroupListener updateGroupListener;
     @Autowired
-    private GroupUpdateCompleteListener groupUpdateCompleteListener;
+    private UpdateGroupCompleteListener updateGroupCompleteListener;
     @Autowired
-    private GroupUpdateFailedListener groupUpdateFailedListener;
+    private UpdateGroupFailedListener updateGroupFailedListener;
     @Autowired
     private GroupTaskListenerFactory groupTaskListenerFactory;
 
@@ -53,39 +53,41 @@ public class DeleteGroupWorkflowDefinition implements WorkflowDefinition {
     public WorkflowProcess defineProcess() {
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(groupUpdateListener);
-        process.addListener(groupUpdateCompleteListener);
-        process.addListener(groupUpdateFailedListener);
-        process.setType("Group Resource Delete");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(GroupResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(updateGroupListener);
+        process.addListener(updateGroupCompleteListener);
+        process.addListener(updateGroupFailedListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //delete datasource
+        // Delete Source
         ServiceTask deleteDataSourceTask = new ServiceTask();
-        deleteDataSourceTask.setName("deleteSource");
+        deleteDataSourceTask.setName("DeleteSource");
         deleteDataSourceTask.setDisplayName("Group-DeleteSource");
         deleteDataSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
         deleteDataSourceTask.addListenerProvider(groupTaskListenerFactory);
         process.addTask(deleteDataSourceTask);
 
-        //delete MQ
+        // Delete MQ
         ServiceTask deleteMqTask = new ServiceTask();
-        deleteMqTask.setName("deleteMQ");
+        deleteMqTask.setName("DeleteMQ");
         deleteMqTask.setDisplayName("Group-DeleteMQ");
         deleteMqTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
         deleteMqTask.addListenerProvider(groupTaskListenerFactory);
         process.addTask(deleteMqTask);
 
-        //delete sort
+        // Delete Sort
         ServiceTask deleteSortTask = new ServiceTask();
-        deleteSortTask.setName("deleteSort");
+        deleteSortTask.setName("DeleteSort");
         deleteSortTask.setDisplayName("Group-DeleteSort");
         deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
         deleteSortTask.addListenerProvider(groupTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
index e79af1a5b..ed9f22413 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/RestartGroupWorkflowDefinition.java
@@ -20,11 +20,11 @@ package org.apache.inlong.manager.service.workflow.group;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupListener;
+import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,58 +41,57 @@ import org.springframework.stereotype.Component;
 public class RestartGroupWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private GroupUpdateListener groupUpdateListener;
-
+    private UpdateGroupListener updateGroupListener;
     @Autowired
-    private GroupUpdateCompleteListener groupUpdateCompleteListener;
-
+    private UpdateGroupCompleteListener updateGroupCompleteListener;
     @Autowired
-    private GroupUpdateFailedListener groupUpdateFailedListener;
-
+    private UpdateGroupFailedListener updateGroupFailedListener;
     @Autowired
-    private GroupTaskListenerFactory groupTaskListenerFactory;
+    private GroupTaskListenerFactory taskListenerFactory;
 
     @Override
     public WorkflowProcess defineProcess() {
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(groupUpdateListener);
-        process.addListener(groupUpdateCompleteListener);
-        process.addListener(groupUpdateFailedListener);
-        process.setType("Group Resource Restart");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(GroupResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(updateGroupListener);
+        process.addListener(updateGroupCompleteListener);
+        process.addListener(updateGroupFailedListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //restart sort
+        // Restart Sort
         ServiceTask restartSortTask = new ServiceTask();
-        restartSortTask.setName("restartSort");
+        restartSortTask.setName("RestartSort");
         restartSortTask.setDisplayName("Group-RestartSort");
         restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
-        restartSortTask.addListenerProvider(groupTaskListenerFactory);
+        restartSortTask.addListenerProvider(taskListenerFactory);
         process.addTask(restartSortTask);
 
-        //restart datasource
-        ServiceTask restartDataSourceTask = new ServiceTask();
-        restartDataSourceTask.setName("restartSource");
-        restartDataSourceTask.setDisplayName("Group-RestartSource");
-        restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
-        restartDataSourceTask.addListenerProvider(groupTaskListenerFactory);
-        process.addTask(restartDataSourceTask);
+        // Restart Source
+        ServiceTask restartSourceTask = new ServiceTask();
+        restartSourceTask.setName("RestartSource");
+        restartSourceTask.setDisplayName("Group-RestartSource");
+        restartSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
+        restartSourceTask.addListenerProvider(taskListenerFactory);
+        process.addTask(restartSourceTask);
 
         // End node
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
 
         startEvent.addNext(restartSortTask);
-        restartSortTask.addNext(restartDataSourceTask);
-        restartDataSourceTask.addNext(endEvent);
+        restartSortTask.addNext(restartSourceTask);
+        restartSourceTask.addNext(endEvent);
 
         return process;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
index 94926298a..aabba5bf3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/SuspendGroupWorkflowDefinition.java
@@ -20,11 +20,11 @@ package org.apache.inlong.manager.service.workflow.group;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.GroupUpdateListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupCompleteListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupFailedListener;
+import org.apache.inlong.manager.service.workflow.group.listener.UpdateGroupListener;
+import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -34,21 +34,18 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Suspend inlong group process definition
+ * Suspend workflow definition for inlong group
  */
 @Slf4j
 @Component
 public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private GroupUpdateListener groupUpdateListener;
-
+    private UpdateGroupListener updateGroupListener;
     @Autowired
-    private GroupUpdateCompleteListener groupUpdateCompleteListener;
-
+    private UpdateGroupCompleteListener updateGroupCompleteListener;
     @Autowired
-    private GroupUpdateFailedListener groupUpdateFailedListener;
-
+    private UpdateGroupFailedListener updateGroupFailedListener;
     @Autowired
     private GroupTaskListenerFactory groupTaskListenerFactory;
 
@@ -56,31 +53,33 @@ public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
     public WorkflowProcess defineProcess() {
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(groupUpdateListener);
-        process.addListener(groupUpdateCompleteListener);
-        process.addListener(groupUpdateFailedListener);
-        process.setType("Group Resource Suspend");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(GroupResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(updateGroupListener);
+        process.addListener(updateGroupCompleteListener);
+        process.addListener(updateGroupFailedListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //stop datasource
-        ServiceTask stopDataSourceTask = new ServiceTask();
-        stopDataSourceTask.setName("stopSource");
-        stopDataSourceTask.setDisplayName("Group-StopSource");
-        stopDataSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
-        stopDataSourceTask.addListenerProvider(groupTaskListenerFactory);
-        process.addTask(stopDataSourceTask);
+        // Stop Source
+        ServiceTask stopSourceTask = new ServiceTask();
+        stopSourceTask.setName("StopSource");
+        stopSourceTask.setDisplayName("Group-StopSource");
+        stopSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
+        stopSourceTask.addListenerProvider(groupTaskListenerFactory);
+        process.addTask(stopSourceTask);
 
-        //stop sort
+        // Stop Sort
         ServiceTask stopSortTask = new ServiceTask();
-        stopSortTask.setName("stopSort");
+        stopSortTask.setName("StopSort");
         stopSortTask.setDisplayName("Group-StopSort");
         stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
         stopSortTask.addListenerProvider(groupTaskListenerFactory);
@@ -90,8 +89,8 @@ public class SuspendGroupWorkflowDefinition implements WorkflowDefinition {
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
 
-        startEvent.addNext(stopDataSourceTask);
-        stopDataSourceTask.addNext(stopSortTask);
+        startEvent.addNext(stopSourceTask);
+        stopSourceTask.addNext(stopSortTask);
         stopSortTask.addNext(endEvent);
 
         return process;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/CreateLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/CreateLightGroupWorkflowDefinition.java
deleted file mode 100644
index 82a24260e..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/CreateLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupInitListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Create light inlong group definition
- */
-@Slf4j
-@Component
-public class CreateLightGroupWorkflowDefinition implements WorkflowDefinition {
-
-    @Autowired
-    private LightGroupInitListener lightGroupInitListener;
-    @Autowired
-    private LightGroupCompleteListener lightGroupCompleteListener;
-    @Autowired
-    private LightGroupFailedListener lightGroupFailedListener;
-    @Autowired
-    private GroupTaskListenerFactory groupTaskListenerFactory;
-
-    @Override
-    public WorkflowProcess defineProcess() {
-        // Configuration process
-        WorkflowProcess process = new WorkflowProcess();
-        process.addListener(lightGroupInitListener);
-        process.addListener(lightGroupCompleteListener);
-        process.addListener(lightGroupFailedListener);
-
-        process.setType("Group Resource Creation");
-        process.setName(getProcessName().name());
-        process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(LightGroupResourceProcessForm.class);
-        process.setVersion(1);
-        process.setHidden(1);
-
-        // Start node
-        StartEvent startEvent = new StartEvent();
-        process.setStartEvent(startEvent);
-
-        // init Sort resource
-        ServiceTask initSortResourceTask = new ServiceTask();
-        initSortResourceTask.setName("initSort");
-        initSortResourceTask.setDisplayName("Group-InitSort");
-        initSortResourceTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
-        initSortResourceTask.addListenerProvider(groupTaskListenerFactory);
-        process.addTask(initSortResourceTask);
-
-        // End node
-        EndEvent endEvent = new EndEvent();
-        process.setEndEvent(endEvent);
-
-        startEvent.addNext(initSortResourceTask);
-        initSortResourceTask.addNext(endEvent);
-
-        return process;
-    }
-
-    @Override
-    public ProcessName getProcessName() {
-        return ProcessName.CREATE_LIGHT_GROUP_PROCESS;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/DeleteLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/DeleteLightGroupWorkflowDefinition.java
deleted file mode 100644
index 915f351ca..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/DeleteLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Delete light workflow definition for inlong group
- */
-@Slf4j
-@Component
-public class DeleteLightGroupWorkflowDefinition implements WorkflowDefinition {
-
-    @Autowired
-    private LightGroupUpdateListener lightGroupUpdateListener;
-    @Autowired
-    private LightGroupUpdateCompleteListener lightGroupUpdateCompleteListener;
-    @Autowired
-    private LightGroupUpdateFailedListener lightGroupUpdateFailedListener;
-    @Autowired
-    private GroupTaskListenerFactory groupTaskListenerFactory;
-
-    @Override
-    public WorkflowProcess defineProcess() {
-        // Configuration process
-        WorkflowProcess process = new WorkflowProcess();
-        process.addListener(lightGroupUpdateListener);
-        process.addListener(lightGroupUpdateCompleteListener);
-        process.addListener(lightGroupUpdateFailedListener);
-        process.setType("Group Resource Delete");
-        process.setName(getProcessName().name());
-        process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(LightGroupResourceProcessForm.class);
-        process.setVersion(1);
-        process.setHidden(1);
-
-        // Start node
-        StartEvent startEvent = new StartEvent();
-        process.setStartEvent(startEvent);
-
-        //delete sort
-        ServiceTask deleteSortTask = new ServiceTask();
-        deleteSortTask.setName("deleteSort");
-        deleteSortTask.setDisplayName("Group-DeleteSort");
-        deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
-        deleteSortTask.addListenerProvider(groupTaskListenerFactory);
-        process.addTask(deleteSortTask);
-
-        // End node
-        EndEvent endEvent = new EndEvent();
-        process.setEndEvent(endEvent);
-
-        startEvent.addNext(deleteSortTask);
-        deleteSortTask.addNext(endEvent);
-
-        return process;
-    }
-
-    @Override
-    public ProcessName getProcessName() {
-        return ProcessName.DELETE_LIGHT_GROUP_PROCESS;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/RestartLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/RestartLightGroupWorkflowDefinition.java
deleted file mode 100644
index cd27b1719..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/RestartLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Restart light inlong group process definition
- */
-@Slf4j
-@Component
-public class RestartLightGroupWorkflowDefinition implements WorkflowDefinition {
-
-    @Autowired
-    private LightGroupUpdateListener lightGroupUpdateListener;
-    @Autowired
-    private LightGroupUpdateCompleteListener lightGroupUpdateCompleteListener;
-    @Autowired
-    private LightGroupUpdateFailedListener lightGroupUpdateFailedListener;
-    @Autowired
-    private GroupTaskListenerFactory groupTaskListenerFactory;
-
-    @Override
-    public WorkflowProcess defineProcess() {
-        // Configuration process
-        WorkflowProcess process = new WorkflowProcess();
-        process.addListener(lightGroupUpdateListener);
-        process.addListener(lightGroupUpdateCompleteListener);
-        process.addListener(lightGroupUpdateFailedListener);
-        process.setType("Group Resource Restart");
-        process.setName(getProcessName().name());
-        process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(LightGroupResourceProcessForm.class);
-        process.setVersion(1);
-        process.setHidden(1);
-
-        // Start node
-        StartEvent startEvent = new StartEvent();
-        process.setStartEvent(startEvent);
-
-        //restart sort
-        ServiceTask restartSortTask = new ServiceTask();
-        restartSortTask.setName("restartSort");
-        restartSortTask.setDisplayName("Group-RestartSort");
-        restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
-        restartSortTask.addListenerProvider(groupTaskListenerFactory);
-        process.addTask(restartSortTask);
-
-        // End node
-        EndEvent endEvent = new EndEvent();
-        process.setEndEvent(endEvent);
-
-        startEvent.addNext(restartSortTask);
-        restartSortTask.addNext(endEvent);
-
-        return process;
-    }
-
-    @Override
-    public ProcessName getProcessName() {
-        return ProcessName.RESTART_LIGHT_GROUP_PROCESS;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/SuspendLightGroupWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/SuspendLightGroupWorkflowDefinition.java
deleted file mode 100644
index 14003dfad..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/light/SuspendLightGroupWorkflowDefinition.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.listener.GroupTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.group.listener.light.LightGroupUpdateListener;
-import org.apache.inlong.manager.workflow.definition.EndEvent;
-import org.apache.inlong.manager.workflow.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
-import org.apache.inlong.manager.workflow.definition.StartEvent;
-import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Suspend light workflow definition for inlong group
- */
-@Slf4j
-@Component
-public class SuspendLightGroupWorkflowDefinition implements WorkflowDefinition {
-
-    @Autowired
-    private LightGroupUpdateListener lightGroupUpdateListener;
-    @Autowired
-    private LightGroupUpdateCompleteListener lightGroupUpdateCompleteListener;
-    @Autowired
-    private LightGroupUpdateFailedListener lightGroupUpdateFailedListener;
-    @Autowired
-    private GroupTaskListenerFactory groupTaskListenerFactory;
-
-    @Override
-    public WorkflowProcess defineProcess() {
-        // Configuration process
-        WorkflowProcess process = new WorkflowProcess();
-        process.addListener(lightGroupUpdateListener);
-        process.addListener(lightGroupUpdateCompleteListener);
-        process.addListener(lightGroupUpdateFailedListener);
-        process.setType("Group Resource Suspend");
-        process.setName(getProcessName().name());
-        process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(LightGroupResourceProcessForm.class);
-        process.setVersion(1);
-        process.setHidden(1);
-
-        // Start node
-        StartEvent startEvent = new StartEvent();
-        process.setStartEvent(startEvent);
-
-        //stop sort
-        ServiceTask stopSortTask = new ServiceTask();
-        stopSortTask.setName("stopSort");
-        stopSortTask.setDisplayName("Group-StopSort");
-        stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
-        stopSortTask.addListenerProvider(groupTaskListenerFactory);
-        process.addTask(stopSortTask);
-
-        // End node
-        EndEvent endEvent = new EndEvent();
-        process.setEndEvent(endEvent);
-
-        startEvent.addNext(stopSortTask);
-        stopSortTask.addNext(endEvent);
-
-        return process;
-    }
-
-    @Override
-    public ProcessName getProcessName() {
-        return ProcessName.SUSPEND_LIGHT_GROUP_PROCESS;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupInitProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupInitProcessListener.java
deleted file mode 100644
index 2902ca88c..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupInitProcessListener.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener;
-
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * Initialize the listener for inlong group information
- */
-@Service
-public class GroupInitProcessListener implements ProcessEventListener {
-
-    @Autowired
-    private InlongGroupService groupService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.CREATE;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-        InlongGroupInfo groupInfo = groupService.get(context.getProcessForm().getInlongGroupId());
-        if (groupInfo != null) {
-            final int status = GroupStatus.CONFIG_ING.getCode();
-            final String username = context.getOperator();
-            groupService.updateStatus(groupInfo.getInlongGroupId(), status, username);
-            form.setGroupInfo(groupInfo);
-        } else {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-        return ListenerResult.success();
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateListener.java
deleted file mode 100644
index 962b4b880..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateListener.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener;
-
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-/**
- * Update listener for inlong group
- */
-@Service
-public class GroupUpdateListener implements ProcessEventListener {
-
-    @Autowired
-    private InlongGroupService groupService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.CREATE;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws Exception {
-        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-        InlongGroupInfo groupInfo = groupService.get(context.getProcessForm().getInlongGroupId());
-        GroupOperateType groupOperateType = form.getGroupOperateType();
-        String username = context.getOperator();
-        if (groupInfo != null) {
-            switch (groupOperateType) {
-                case SUSPEND:
-                    groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.SUSPENDING.getCode(), username);
-                    break;
-                case RESTART:
-                    groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.RESTARTING.getCode(), username);
-                    break;
-                case DELETE:
-                    groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.DELETING.getCode(), username);
-                    break;
-                default:
-                    break;
-            }
-            form.setGroupInfo(groupInfo);
-        } else {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
-        return ListenerResult.success();
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
similarity index 66%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
index 4e885b957..8154f2f7e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupCompleteListener.java
@@ -18,13 +18,15 @@
 package org.apache.inlong.manager.service.workflow.group.listener;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -34,11 +36,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Create group resources [process completion] event listener
+ * The listener of InlongGroup when created resources successfully.
  */
 @Slf4j
 @Component
-public class GroupCompleteProcessListener implements ProcessEventListener {
+public class InitGroupCompleteListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
@@ -53,23 +55,32 @@ public class GroupCompleteProcessListener implements ProcessEventListener {
     }
 
     /**
-     * After the process of creating inlong group resources is completed, modify the status of related
-     * inlong group and all inlong stream to [Configuration Successful] [Configuration Failed]
-     * <p/>{@link GroupFailedProcessListener#listen}
+     * After the process of creating InlongGroup resources is completed,
+     * modify the status of related InlongGroup to [Successful]
+     * <p/>
+     * {@link InitGroupFailedListener#listen}
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
-        String applicant = context.getOperator();
-        // Update inlong group status and other info
-        groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
-        groupService.update(form.getGroupInfo().genRequest(), applicant);
+        log.info("begin to execute InitGroupCompleteListener for groupId={}", groupId);
+
+        InlongGroupInfo groupInfo = form.getGroupInfo();
+        String operator = context.getOperator();
+        // update inlong group status and other info
+        groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+        groupService.update(groupInfo.genRequest(), operator);
 
-        // Update status of other related configs
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
-        sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), applicant);
+        // update status of other related configs
+        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+        if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+            sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+        } else {
+            sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        }
 
+        log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupFailedListener.java
similarity index 75%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupFailedListener.java
index 37ed3e672..fc0c1caa3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupFailedListener.java
@@ -22,8 +22,8 @@ import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -32,11 +32,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Create group resources [process failed] event listener
+ * The listener of InlongGroup when created resources failed.
  */
 @Slf4j
 @Component
-public class GroupFailedProcessListener implements ProcessEventListener {
+public class InitGroupFailedListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
@@ -49,20 +49,26 @@ public class GroupFailedProcessListener implements ProcessEventListener {
     }
 
     /**
-     * The process of creating inlong group resources is abnormal, and the inlong group status is changed to
-     * [Configuration failed] [Configuration successful]
-     * <p/>{@link GroupCompleteProcessListener#listen}
+     * After the process of creating InlongGroup resources is completed,
+     * modify the status of related InlongGroup and all InlongStream to [Failed]
+     * <p/>
+     * {@link InitGroupCompleteListener#listen}
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
-        String username = context.getOperator();
+        log.info("begin to execute InitGroupFailedListener for groupId={}", groupId);
+
+        // update inlong group status
+        String operator = context.getOperator();
+        groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), operator);
+        groupService.update(form.getGroupInfo().genRequest(), operator);
+
+        // update inlong stream status
+        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), operator);
 
-        // Update inlong group status
-        groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), username);
-        // Update inlong stream status
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), username);
+        log.info("success to execute InitGroupFailedListener for groupId={}", groupId);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupInitListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupListener.java
similarity index 67%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupInitListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupListener.java
index 054f3c656..f2594bd6e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupInitListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/InitGroupListener.java
@@ -15,18 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.group.listener.light;
+package org.apache.inlong.manager.service.workflow.group.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -34,41 +31,40 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.List;
-
 /**
- * Listener of light group init.
+ * The listener for initial the InlongGroup information.
  */
 @Slf4j
 @Component
-public class LightGroupInitListener implements ProcessEventListener {
+public class InitGroupListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
 
-    @Autowired
-    private InlongStreamService streamService;
-
     @Override
     public ProcessEvent event() {
         return ProcessEvent.CREATE;
     }
 
+    /**
+     * Begin to execute the InlongGroup workflow, init the workflow context, and update other info if needed.
+     */
     @Override
-    public ListenerResult listen(WorkflowContext context) throws Exception {
-        LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+        String groupId = form.getInlongGroupId();
+        log.info("begin to execute InitGroupListener for groupId={}", groupId);
+
         InlongGroupInfo groupInfo = form.getGroupInfo();
         if (groupInfo == null) {
-            throw new WorkflowListenerException(ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
+            throw new WorkflowListenerException("inlong group info cannot be null for init group process");
         }
-        final String groupId = groupInfo.getInlongGroupId();
-        final int status = GroupStatus.CONFIG_ING.getCode();
-        final String username = context.getOperator();
-        groupService.updateStatus(groupInfo.getInlongGroupId(), status, username);
         if (CollectionUtils.isEmpty(form.getStreamInfos())) {
-            List<InlongStreamInfo> streamInfos = streamService.list(groupId);
-            form.setStreamInfos(streamInfos);
+            throw new WorkflowListenerException("inlong stream info list cannot be null for init group process");
         }
+        groupService.updateStatus(groupId, GroupStatus.CONFIG_ING.getCode(), context.getOperator());
+
+        log.info("success to execute InitGroupListener for groupId={}", groupId);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
similarity index 70%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
index 7d1d93190..6df9863ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupCompleteListener.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.group.listener.light;
+package org.apache.inlong.manager.service.workflow.group.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -33,17 +32,15 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Listener of light group update complete.
+ * The listener of InlongGroup when update operates successfully.
  */
 @Slf4j
 @Component
-public class LightGroupUpdateCompleteListener implements ProcessEventListener {
+public class UpdateGroupCompleteListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private InlongStreamService streamService;
-    @Autowired
     private StreamSourceService sourceService;
 
     @Override
@@ -53,27 +50,33 @@ public class LightGroupUpdateCompleteListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
-        LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
-        final String groupId = form.getInlongGroupId();
-        final String applicant = context.getOperator();
-        GroupOperateType groupOperateType = form.getGroupOperateType();
-        switch (groupOperateType) {
+        GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+        String groupId = form.getInlongGroupId();
+        GroupOperateType operateType = form.getGroupOperateType();
+        log.info("begin to execute UpdateGroupCompleteListener for groupId={}, operateType={}", groupId, operateType);
+
+        String operator = context.getOperator();
+        switch (operateType) {
             case SUSPEND:
-                groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), applicant);
-                sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_FROZEN.getCode(), applicant);
+                groupService.updateStatus(groupId, GroupStatus.SUSPENDED.getCode(), operator);
+                sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_FROZEN.getCode(), operator);
                 break;
             case RESTART:
-                groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), applicant);
-                sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
+                groupService.updateStatus(groupId, GroupStatus.RESTARTED.getCode(), operator);
+                sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
                 break;
             case DELETE:
-                groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), applicant);
-                sourceService.logicDeleteAll(groupId, null, applicant);
+                groupService.updateStatus(groupId, GroupStatus.DELETED.getCode(), operator);
+                sourceService.logicDeleteAll(groupId, null, operator);
                 break;
             default:
                 break;
         }
-        groupService.update(form.getGroupInfo().genRequest(), applicant);
+
+        // update inlong group status and other configs
+        groupService.update(form.getGroupInfo().genRequest(), operator);
+
+        log.info("success to execute UpdateGroupCompleteListener for groupId={}, operateType={}", groupId, operateType);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupFailedListener.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateFailedListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupFailedListener.java
index 355ac185b..cb93afece 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupFailedListener.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.workflow.group.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -30,11 +29,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Update failed listener for inlong group
+ * The listener of InlongGroup when update operates failed.
  */
 @Slf4j
 @Component
-public class GroupUpdateFailedListener implements ProcessEventListener {
+public class UpdateGroupFailedListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
@@ -47,11 +46,15 @@ public class GroupUpdateFailedListener implements ProcessEventListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
+        String groupId = form.getInlongGroupId();
+        log.info("begin to execute UpdateGroupFailedListener for groupId={}", groupId);
+
+        // update inlong group status and other info
         String operator = context.getOperator();
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        // Update inlong group status and other info
-        groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.CONFIG_FAILED.getCode(), operator);
-        groupService.update(groupInfo.genRequest(), operator);
+        groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), operator);
+        groupService.update(form.getGroupInfo().genRequest(), operator);
+
+        log.info("success to execute UpdateGroupFailedListener for groupId={}", groupId);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupListener.java
similarity index 72%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupListener.java
index 3886b09f5..24442fa02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/UpdateGroupListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.group.listener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -31,43 +32,48 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Update completed listener for inlong group
+ * The listener for the update InlongGroup.
  */
 @Slf4j
 @Component
-public class GroupUpdateCompleteListener implements ProcessEventListener {
+public class UpdateGroupListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
 
     @Override
     public ProcessEvent event() {
-        return ProcessEvent.COMPLETE;
+        return ProcessEvent.CREATE;
     }
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws Exception {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-        String operator = context.getOperator();
-        GroupOperateType operateType = form.getGroupOperateType();
+        String groupId = form.getInlongGroupId();
+        log.info("begin to execute UpdateGroupListener for groupId={}", groupId);
+
         InlongGroupInfo groupInfo = form.getGroupInfo();
-        Integer nextStatus;
+        if (groupInfo == null) {
+            throw new BusinessException("InlongGroupInfo cannot be null for update group process");
+        }
+
+        GroupOperateType operateType = form.getGroupOperateType();
+        String operator = context.getOperator();
         switch (operateType) {
-            case RESTART:
-                nextStatus = GroupStatus.RESTARTED.getCode();
-                break;
             case SUSPEND:
-                nextStatus = GroupStatus.SUSPENDED.getCode();
+                groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), operator);
+                break;
+            case RESTART:
+                groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), operator);
                 break;
             case DELETE:
-                nextStatus = GroupStatus.DELETED.getCode();
+                groupService.updateStatus(groupId, GroupStatus.DELETING.getCode(), operator);
                 break;
             default:
-                throw new RuntimeException(String.format("Unsupported operation=%s for inlong group", operateType));
+                break;
         }
-        // Update inlong group status and other info
-        groupService.updateStatus(groupInfo.getInlongGroupId(), nextStatus, operator);
-        groupService.update(groupInfo.genRequest(), operator);
+
+        log.info("success to execute UpdateGroupListener for groupId={}, operateType={}", groupId, operateType);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupAfterApprovedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/AfterApprovedTaskListener.java
similarity index 80%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupAfterApprovedListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/AfterApprovedTaskListener.java
index 15d002e6e..df2cad08f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupAfterApprovedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/AfterApprovedTaskListener.java
@@ -15,18 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.group.listener.approve;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamApproveRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.task.InlongGroupApproveForm;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.TaskEvent;
@@ -34,15 +33,14 @@ import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.List;
 import java.util.Objects;
 
 /**
- * Approve pass listener for new inlong group
+ * The listener for modifying InlongGroup info after the application InlongGroup process is approved.
  */
 @Slf4j
 @Component
-public class GroupAfterApprovedListener implements TaskEventListener {
+public class AfterApprovedTaskListener implements TaskEventListener {
 
     @Autowired
     private InlongGroupService groupService;
@@ -56,14 +54,17 @@ public class GroupAfterApprovedListener implements TaskEventListener {
         return TaskEvent.APPROVE;
     }
 
+    /**
+     * After approval, persist the modified info during the approval in DB.
+     */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        // Save the data format selected at the time of approval and the cluster information of the inlong stream
         InlongGroupApproveForm form = (InlongGroupApproveForm) context.getActionContext().getForm();
-
         InlongGroupApproveRequest approveInfo = form.getGroupApproveInfo();
-        // Only the [Wait approval] status allowed the passing operation
         String groupId = approveInfo.getInlongGroupId();
+        log.info("begin to execute AfterApprovedTaskListener for groupId={}", groupId);
+
+        // only the [TO_BE_APPROVAL] status allowed the passing operation
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
             throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
@@ -72,12 +73,12 @@ public class GroupAfterApprovedListener implements TaskEventListener {
             throw new WorkflowListenerException("inlong group status is [wait_approval], not allowed to approve again");
         }
 
-        // Save the inlong group information after approval
+        // save the inlong group and other info after approval
         groupService.updateAfterApprove(approveInfo, context.getOperator());
+        streamService.updateAfterApprove(form.getStreamApproveInfoList(), context.getOperator());
 
-        // Save inlong stream information after approval
-        List<InlongStreamApproveRequest> streamApproveInfoList = form.getStreamApproveInfoList();
-        streamService.updateAfterApprove(streamApproveInfoList, context.getOperator());
+        log.info("success to execute AfterApprovedTaskListener for groupId={}", groupId);
         return ListenerResult.success();
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupApproveProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
similarity index 62%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupApproveProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
index cc560e436..8f1890621 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupApproveProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/ApproveApplyProcessListener.java
@@ -15,17 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.group.listener.approve;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupMode;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
@@ -40,65 +37,42 @@ import org.springframework.stereotype.Component;
 import java.util.List;
 
 /**
- * After the new inlong group is approved, initiate a listener for other processes
+ * The listener that approves to apply for InlongGroup.
+ * <p/>
+ * After approval, start other follow-up processes.
  */
 @Slf4j
 @Component
-public class GroupApproveProcessListener implements ProcessEventListener {
+public class ApproveApplyProcessListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
     @Autowired
-    private WorkflowService workflowService;
-    @Autowired
     private InlongStreamService streamService;
+    @Autowired
+    private WorkflowService workflowService;
 
     @Override
     public ProcessEvent event() {
         return ProcessEvent.COMPLETE;
     }
 
-    /**
-     * Initiate the process of creating inlong group resources after new inlong group access approved
-     */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewGroupProcessForm form = (NewGroupProcessForm) context.getProcessForm();
+        ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
-        InlongGroupInfo groupInfo = groupService.get(groupId);
-        GroupMode mode = GroupMode.parseGroupMode(groupInfo);
-        switch (mode) {
-            case NORMAL:
-                createGroupResource(context, groupInfo);
-                break;
-            case LIGHTWEIGHT:
-                createLightGroupResource(context, groupInfo);
-                break;
-            default:
-                throw new WorkflowListenerException(ErrorCodeEnum.GROUP_MODE_UNSUPPORTED.getMessage());
-        }
-
-        return ListenerResult.success();
-    }
+        log.info("begin to execute ApproveApplyProcessListener for groupId={}", groupId);
 
-    private void createGroupResource(WorkflowContext context, InlongGroupInfo groupInfo) {
+        InlongGroupInfo groupInfo = groupService.get(groupId);
         GroupResourceProcessForm processForm = new GroupResourceProcessForm();
         processForm.setGroupInfo(groupInfo);
         String username = context.getOperator();
-        String groupId = groupInfo.getInlongGroupId();
         List<InlongStreamInfo> streamList = streamService.list(groupId);
         processForm.setStreamInfos(streamList);
         workflowService.start(ProcessName.CREATE_GROUP_RESOURCE, username, processForm);
-    }
 
-    private void createLightGroupResource(WorkflowContext context, InlongGroupInfo groupInfo) {
-        LightGroupResourceProcessForm processForm = new LightGroupResourceProcessForm();
-        processForm.setGroupInfo(groupInfo);
-        String username = context.getOperator();
-        String groupId = groupInfo.getInlongGroupId();
-        List<InlongStreamInfo> streamList = streamService.list(groupId);
-        processForm.setStreamInfos(streamList);
-        workflowService.start(ProcessName.CREATE_LIGHT_GROUP_PROCESS, username, processForm);
+        log.info("success to execute ApproveApplyProcessListener for groupId={}", groupId);
+        return ListenerResult.success();
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/CancelApplyProcessListener.java
similarity index 71%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/CancelApplyProcessListener.java
index c1fd225f9..582d95d37 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/GroupCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/CancelApplyProcessListener.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.group.listener;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
 
-import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -31,12 +30,14 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Objects;
+
 /**
- * Event listener for new group access, the initiator cancels the approval
+ * The listener of InlongGroup when cancels the application.
  */
 @Slf4j
 @Component
-public class GroupCancelProcessListener implements ProcessEventListener {
+public class CancelApplyProcessListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupEntityMapper groupMapper;
@@ -46,25 +47,28 @@ public class GroupCancelProcessListener implements ProcessEventListener {
         return ProcessEvent.CANCEL;
     }
 
+    /**
+     * After canceling the approval, the status becomes [ToBeSubmit]
+     */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewGroupProcessForm form = (NewGroupProcessForm) context.getProcessForm();
-        // After canceling the approval, the status becomes [Waiting to submit]
+        ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
+        log.info("begin to execute CancelApplyProcessListener for groupId={}", groupId);
 
-        // Only the [Wait approval] status allowed the canceling operation
+        // only the [ToBeApproval] status allowed the canceling operation
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
-            throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
+            throw new WorkflowListenerException("InlongGroup not found with groupId=" + groupId);
         }
         if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) {
-            throw new WorkflowListenerException("current status was not allowed to cancel business");
+            throw new WorkflowListenerException(String.format("Current status [%s] was not allowed to cancel",
+                    GroupStatus.forCode(entity.getStatus())));
         }
+        String operator = context.getOperator();
+        groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), operator);
 
-        // After canceling the approval, the status becomes [Waiting to submit]
-        String username = context.getOperator();
-        groupMapper.updateStatus(groupId, GroupStatus.TO_BE_SUBMIT.getCode(), username);
-
+        log.info("success to execute CancelApplyProcessListener for groupId={}", groupId);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/RejectApplyProcessListener.java
similarity index 83%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/RejectApplyProcessListener.java
index bcbf2046a..1a903537c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/approve/GroupRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/apply/RejectApplyProcessListener.java
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.group.listener.approve;
+package org.apache.inlong.manager.service.workflow.group.listener.apply;
 
-import java.util.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.NewGroupProcessForm;
+import org.apache.inlong.manager.common.pojo.workflow.form.process.ApplyGroupProcessForm;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -32,12 +31,14 @@ import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Objects;
+
 /**
- * Approve reject listener for new inlong group
+ * The listener that rejects to apply for InlongGroup.
  */
 @Slf4j
 @Component
-public class GroupRejectProcessListener implements ProcessEventListener {
+public class RejectApplyProcessListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupService groupService;
@@ -51,19 +52,20 @@ public class GroupRejectProcessListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewGroupProcessForm form = (NewGroupProcessForm) context.getProcessForm();
-
-        // Only the [Wait approval] status allowed the rejecting operation
+        ApplyGroupProcessForm form = (ApplyGroupProcessForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
+        log.info("begin to execute RejectApplyProcessListener for groupId={}", groupId);
+
+        // only the [TO_BE_APPROVAL] status allowed the rejecting operation
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity == null) {
-            throw new WorkflowListenerException("inlong group not found with group id=" + groupId);
+            throw new WorkflowListenerException("inlong group not found with groupId=" + groupId);
         }
         if (!Objects.equals(GroupStatus.TO_BE_APPROVAL.getCode(), entity.getStatus())) {
             throw new WorkflowListenerException("current status was not allowed to reject inlong group");
         }
 
-        // After reject, update inlong group status to [GROUP_APPROVE_REJECT]
+        // after reject, update InlongGroup status to [APPROVE_REJECTED]
         String username = context.getOperator();
         groupService.updateStatus(groupId, GroupStatus.APPROVE_REJECTED.getCode(), username);
         return ListenerResult.success();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
deleted file mode 100644
index 764b3ca44..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupCompleteListener.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.SourceStatus;
-import org.apache.inlong.manager.common.enums.StreamStatus;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.service.source.StreamSourceService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener of light group complete.
- */
-@Slf4j
-@Component
-public class LightGroupCompleteListener implements ProcessEventListener {
-
-    @Autowired
-    private InlongGroupService groupService;
-    @Autowired
-    private InlongStreamService streamService;
-    @Autowired
-    private StreamSourceService sourceService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.COMPLETE;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws Exception {
-        LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
-        final String groupId = form.getGroupInfo().getInlongGroupId();
-        final String applicant = context.getOperator();
-        // Update inlong group status and other info
-        groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
-        groupService.update(form.getGroupInfo().genRequest(), applicant);
-        // Update status of other related configs
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), applicant);
-        sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), applicant);
-        return ListenerResult.success();
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
deleted file mode 100644
index f821bf1b1..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupFailedListener.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.StreamStatus;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener of light group failed.
- */
-@Slf4j
-@Component
-public class LightGroupFailedListener implements ProcessEventListener {
-
-    @Autowired
-    private InlongGroupService groupService;
-    @Autowired
-    private InlongStreamService streamService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.FAIL;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws Exception {
-        LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
-        final String groupId = form.getGroupInfo().getInlongGroupId();
-        final String applicant = context.getOperator();
-        // Update inlong group status
-        groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), applicant);
-        groupService.update(form.getGroupInfo().genRequest(), applicant);
-        // Update inlong stream status
-        streamService.updateStatus(groupId, null, StreamStatus.CONFIG_FAILED.getCode(), applicant);
-        return ListenerResult.fail();
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateFailedListener.java
deleted file mode 100644
index ffb6872c3..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateFailedListener.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener of light group update failed.
- */
-@Slf4j
-@Component
-public class LightGroupUpdateFailedListener implements ProcessEventListener {
-
-    @Autowired
-    private InlongGroupService groupService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.FAIL;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws Exception {
-        LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        final String groupId = groupInfo.getInlongGroupId();
-        final String applicant = context.getOperator();
-        // Update inlong group status
-        groupService.updateStatus(groupId, GroupStatus.CONFIG_FAILED.getCode(), applicant);
-        groupService.update(groupInfo.genRequest(), applicant);
-        return ListenerResult.success();
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
deleted file mode 100644
index df53430b5..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/group/listener/light/LightGroupUpdateListener.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.group.listener.light;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.GroupOperateType;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.workflow.form.process.LightGroupResourceProcessForm;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.core.InlongStreamService;
-import org.apache.inlong.manager.workflow.WorkflowContext;
-import org.apache.inlong.manager.workflow.event.ListenerResult;
-import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-import java.util.List;
-
-/**
- * Listener of light group update.
- */
-@Slf4j
-@Component
-public class LightGroupUpdateListener implements ProcessEventListener {
-
-    @Autowired
-    private InlongGroupService groupService;
-    @Autowired
-    private InlongStreamService streamService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.CREATE;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws Exception {
-        LightGroupResourceProcessForm form = (LightGroupResourceProcessForm) context.getProcessForm();
-        final String groupId = form.getGroupInfo().getInlongGroupId();
-        final String applicant = context.getOperator();
-        GroupOperateType groupOperateType = form.getGroupOperateType();
-        switch (groupOperateType) {
-            case SUSPEND:
-                groupService.updateStatus(groupId, GroupStatus.SUSPENDING.getCode(), applicant);
-                break;
-            case RESTART:
-                groupService.updateStatus(groupId, GroupStatus.RESTARTING.getCode(), applicant);
-                break;
-            case DELETE:
-                groupService.updateStatus(groupId, GroupStatus.DELETING.getCode(), applicant);
-                break;
-            default:
-                break;
-        }
-        if (CollectionUtils.isEmpty(form.getStreamInfos())) {
-            List<InlongStreamInfo> streamInfos = streamService.list(groupId);
-            form.setStreamInfos(streamInfos);
-        }
-        return ListenerResult.success();
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 8876d79eb..070d4fc76 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamFailedProcessListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamInitProcessListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.InitStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.InitStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.InitStreamListener;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,61 +41,61 @@ import org.springframework.stereotype.Component;
 public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private StreamInitProcessListener streamInitProcessListener;
+    private InitStreamListener initStreamListener;
     @Autowired
-    private StreamFailedProcessListener streamFailedProcessListener;
+    private InitStreamCompleteListener initStreamCompleteListener;
     @Autowired
-    private StreamCompleteProcessListener streamCompleteProcessListener;
+    private InitStreamFailedListener initStreamFailedListener;
     @Autowired
     private StreamTaskListenerFactory streamTaskListenerFactory;
 
     @Override
     public WorkflowProcess defineProcess() {
-
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(streamInitProcessListener);
-        process.addListener(streamFailedProcessListener);
-        process.addListener(streamCompleteProcessListener);
-
-        process.setType("Stream Resource Creation");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(StreamResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(initStreamListener);
+        process.addListener(initStreamFailedListener);
+        process.addListener(initStreamCompleteListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        // init MQ
+        // Init MQ
         ServiceTask initMQTask = new ServiceTask();
-        initMQTask.setName("initMQ");
+        initMQTask.setName("InitMQ");
         initMQTask.setDisplayName("Stream-InitMQ");
         initMQTask.addServiceTaskType(ServiceTaskType.INIT_MQ);
         initMQTask.addListenerProvider(streamTaskListenerFactory);
         process.addTask(initMQTask);
 
-        // init Sink
+        // Init Sink
         ServiceTask initSinkTask = new ServiceTask();
-        initSinkTask.setName("initSink");
+        initSinkTask.setName("InitSink");
         initSinkTask.setDisplayName("Stream-InitSink");
         initSinkTask.addServiceTaskType(ServiceTaskType.INIT_SINK);
         initSinkTask.addListenerProvider(streamTaskListenerFactory);
         process.addTask(initSinkTask);
 
-        // init Sort
+        // Init Sort
         ServiceTask initSortTask = new ServiceTask();
-        initSortTask.setName("initSort");
+        initSortTask.setName("InitSort");
         initSortTask.setDisplayName("Stream-InitSort");
         initSortTask.addServiceTaskType(ServiceTaskType.INIT_SORT);
         initSortTask.addListenerProvider(streamTaskListenerFactory);
         process.addTask(initSortTask);
 
-        // init Source
+        // Init Source
         ServiceTask initSourceTask = new ServiceTask();
-        initSourceTask.setName("initSource");
+        initSourceTask.setName("InitSource");
         initSourceTask.setDisplayName("Stream-InitSource");
         initSourceTask.addServiceTaskType(ServiceTaskType.INIT_SOURCE);
         initSourceTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
index 257e14adc..c87cc760d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/DeleteStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamListener;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,11 +41,11 @@ import org.springframework.stereotype.Component;
 public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private StreamUpdateListener streamUpdateListener;
+    private UpdateStreamListener updateStreamListener;
     @Autowired
-    private StreamUpdateCompleteListener streamUpdateCompleteListener;
+    private UpdateStreamCompleteListener updateStreamCompleteListener;
     @Autowired
-    private StreamUpdateFailedListener streamUpdateFailedListener;
+    private UpdateStreamFailedListener updateStreamFailedListener;
     @Autowired
     private StreamTaskListenerFactory streamTaskListenerFactory;
 
@@ -53,23 +53,25 @@ public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
     public WorkflowProcess defineProcess() {
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(streamUpdateListener);
-        process.addListener(streamUpdateCompleteListener);
-        process.addListener(streamUpdateFailedListener);
-        process.setType("Stream Resource Delete");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(StreamResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(updateStreamListener);
+        process.addListener(updateStreamCompleteListener);
+        process.addListener(updateStreamFailedListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        // Delete datasource
+        // Delete Source
         ServiceTask deleteDataSourceTask = new ServiceTask();
-        deleteDataSourceTask.setName("deleteSource");
+        deleteDataSourceTask.setName("DeleteSource");
         deleteDataSourceTask.setDisplayName("Stream-DeleteSource");
         deleteDataSourceTask.addServiceTaskType(ServiceTaskType.DELETE_SOURCE);
         deleteDataSourceTask.addListenerProvider(streamTaskListenerFactory);
@@ -77,15 +79,15 @@ public class DeleteStreamWorkflowDefinition implements WorkflowDefinition {
 
         // Delete MQ
         ServiceTask deleteMQTask = new ServiceTask();
-        deleteMQTask.setName("deleteMQ");
+        deleteMQTask.setName("DeleteMQ");
         deleteMQTask.setDisplayName("Stream-DeleteMQ");
         deleteMQTask.addServiceTaskType(ServiceTaskType.DELETE_MQ);
         deleteMQTask.addListenerProvider(streamTaskListenerFactory);
         process.addTask(deleteMQTask);
 
-        // Delete sort
+        // Delete Sort
         ServiceTask deleteSortTask = new ServiceTask();
-        deleteSortTask.setName("deleteSort");
+        deleteSortTask.setName("DeleteSort");
         deleteSortTask.setDisplayName("Stream-DeleteSort");
         deleteSortTask.addServiceTaskType(ServiceTaskType.DELETE_SORT);
         deleteSortTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
index 8407c003a..fa0495098 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/RestartStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamListener;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,11 +41,11 @@ import org.springframework.stereotype.Component;
 public class RestartStreamWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private StreamUpdateListener streamUpdateListener;
+    private UpdateStreamListener updateStreamListener;
     @Autowired
-    private StreamUpdateFailedListener streamUpdateFailedListener;
+    private UpdateStreamCompleteListener updateStreamCompleteListener;
     @Autowired
-    private StreamUpdateCompleteListener streamUpdateCompleteListener;
+    private UpdateStreamFailedListener updateStreamFailedListener;
     @Autowired
     private StreamTaskListenerFactory streamTaskListenerFactory;
 
@@ -53,31 +53,33 @@ public class RestartStreamWorkflowDefinition implements WorkflowDefinition {
     public WorkflowProcess defineProcess() {
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(streamUpdateListener);
-        process.addListener(streamUpdateCompleteListener);
-        process.addListener(streamUpdateFailedListener);
-        process.setType("Stream Resource Restart");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(StreamResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(updateStreamListener);
+        process.addListener(updateStreamCompleteListener);
+        process.addListener(updateStreamFailedListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //restart sort
+        // Restart Sort
         ServiceTask restartSortTask = new ServiceTask();
-        restartSortTask.setName("restartSort");
+        restartSortTask.setName("RestartSort");
         restartSortTask.setDisplayName("Stream-RestartSort");
         restartSortTask.addServiceTaskType(ServiceTaskType.RESTART_SORT);
         restartSortTask.addListenerProvider(streamTaskListenerFactory);
         process.addTask(restartSortTask);
 
-        //restart datasource
+        // Restart Source
         ServiceTask restartDataSourceTask = new ServiceTask();
-        restartDataSourceTask.setName("restartSource");
+        restartDataSourceTask.setName("RestartSource");
         restartDataSourceTask.setDisplayName("Stream-RestartSource");
         restartDataSourceTask.addServiceTaskType(ServiceTaskType.RESTART_SOURCE);
         restartDataSourceTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
index 38d23efda..9eb8591c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/SuspendStreamWorkflowDefinition.java
@@ -22,9 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.form.process.StreamResourc
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
 import org.apache.inlong.manager.service.workflow.listener.StreamTaskListenerFactory;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateCompleteListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateFailedListener;
-import org.apache.inlong.manager.service.workflow.stream.listener.StreamUpdateListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamCompleteListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamFailedListener;
+import org.apache.inlong.manager.service.workflow.stream.listener.UpdateStreamListener;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.ServiceTaskType;
@@ -41,11 +41,11 @@ import org.springframework.stereotype.Component;
 public class SuspendStreamWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private StreamUpdateListener streamUpdateListener;
+    private UpdateStreamListener updateStreamListener;
     @Autowired
-    private StreamUpdateCompleteListener streamUpdateCompleteListener;
+    private UpdateStreamCompleteListener updateStreamCompleteListener;
     @Autowired
-    private StreamUpdateFailedListener streamUpdateFailedListener;
+    private UpdateStreamFailedListener updateStreamFailedListener;
     @Autowired
     private StreamTaskListenerFactory streamTaskListenerFactory;
 
@@ -53,31 +53,33 @@ public class SuspendStreamWorkflowDefinition implements WorkflowDefinition {
     public WorkflowProcess defineProcess() {
         // Configuration process
         WorkflowProcess process = new WorkflowProcess();
-        process.addListener(streamUpdateListener);
-        process.addListener(streamUpdateFailedListener);
-        process.addListener(streamUpdateCompleteListener);
-        process.setType("Stream Resource Suspend");
         process.setName(getProcessName().name());
+        process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
         process.setFormClass(StreamResourceProcessForm.class);
         process.setVersion(1);
         process.setHidden(1);
 
+        // Set up the listener
+        process.addListener(updateStreamListener);
+        process.addListener(updateStreamFailedListener);
+        process.addListener(updateStreamCompleteListener);
+
         // Start node
         StartEvent startEvent = new StartEvent();
         process.setStartEvent(startEvent);
 
-        //stop datasource
+        // Stop Source
         ServiceTask stopDataSourceTask = new ServiceTask();
-        stopDataSourceTask.setName("stopSource");
+        stopDataSourceTask.setName("StopSource");
         stopDataSourceTask.setDisplayName("Stream-StopSource");
         stopDataSourceTask.addServiceTaskType(ServiceTaskType.STOP_SOURCE);
         stopDataSourceTask.addListenerProvider(streamTaskListenerFactory);
         process.addTask(stopDataSourceTask);
 
-        //stop sort
+        // Stop Sort
         ServiceTask stopSortTask = new ServiceTask();
-        stopSortTask.setName("stopSort");
+        stopSortTask.setName("StopSort");
         stopSortTask.setDisplayName("Stream-StopSort");
         stopSortTask.addServiceTaskType(ServiceTaskType.STOP_SORT);
         stopSortTask.addListenerProvider(streamTaskListenerFactory);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamCompleteListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamCompleteListener.java
index 3378eb63d..e890f64d0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamCompleteListener.java
@@ -33,11 +33,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Event listener for completed creation of inlong stream resource
+ * The listener of InlongStream when created resources successfully.
  */
 @Slf4j
 @Component
-public class StreamCompleteProcessListener implements ProcessEventListener {
+public class InitStreamCompleteListener implements ProcessEventListener {
 
     @Autowired
     private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamFailedListener.java
similarity index 94%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamFailedListener.java
index 3928c5d83..9ebb4e31c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamFailedListener.java
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Event listener for failed creation of inlong stream resource
+ * The listener of InlongStream when created resources failed.
  */
 @Slf4j
 @Component
-public class StreamFailedProcessListener implements ProcessEventListener {
+public class InitStreamFailedListener implements ProcessEventListener {
 
     @Autowired
     private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamInitProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamInitProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamListener.java
index c24a39428..2cc1a3246 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamInitProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/InitStreamListener.java
@@ -32,10 +32,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * Initialize the listener for inlong group information
+ * The listener for initial the InlongStream information.
  */
 @Service
-public class StreamInitProcessListener implements ProcessEventListener {
+public class InitStreamListener implements ProcessEventListener {
 
     @Autowired
     private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
index cf9034fde..10fcc65bd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamCompleteListener.java
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Update completed listener for inlong stream
+ * The listener of InlongStream when update operates successfully.
  */
 @Slf4j
 @Component
-public class StreamUpdateCompleteListener implements ProcessEventListener {
+public class UpdateStreamCompleteListener implements ProcessEventListener {
 
     @Autowired
     private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateFailedListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamFailedListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateFailedListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamFailedListener.java
index f6fc212a9..c57f871c0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateFailedListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamFailedListener.java
@@ -30,11 +30,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Update failed listener for inlong stream
+ * The listener of InlongStream when update operates failed.
  */
 @Slf4j
 @Component
-public class StreamUpdateFailedListener implements ProcessEventListener {
+public class UpdateStreamFailedListener implements ProcessEventListener {
 
     @Autowired
     private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamListener.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamListener.java
index 75a408e99..4d1917640 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/StreamUpdateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/listener/UpdateStreamListener.java
@@ -30,10 +30,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 /**
- * Update listener for inlong stream
+ * The listener for the update InlongStream.
  */
 @Service
-public class StreamUpdateListener implements ProcessEventListener {
+public class UpdateStreamListener implements ProcessEventListener {
 
     @Autowired
     private InlongStreamService streamService;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
index 6b7866e4b..0789d63c5 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/ServiceBaseTest.java
@@ -17,13 +17,29 @@
 
 package org.apache.inlong.manager.service;
 
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.FieldType;
+import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
+import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.group.none.InlongNoneMqInfo;
+import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
+import org.apache.inlong.manager.common.pojo.stream.StreamField;
+import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.test.BaseTest;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.test.context.SpringBootTest;
 
+import java.util.ArrayList;
+import java.util.List;
+
 /**
  * Test class for base test service.
  */
@@ -31,14 +47,98 @@ import org.springframework.boot.test.context.SpringBootTest;
 @SpringBootTest(classes = ServiceBaseTest.class)
 public class ServiceBaseTest extends BaseTest {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBaseTest.class);
-
     public static final String GLOBAL_GROUP_ID = "global_group";
     public static final String GLOBAL_STREAM_ID = "global_stream";
     public static final String GLOBAL_OPERATOR = "admin";
+    private static final Logger LOGGER = LoggerFactory.getLogger(ServiceBaseTest.class);
+
+    @Autowired
+    protected InlongGroupService groupService;
+    @Autowired
+    protected InlongStreamService streamService;
 
     @Test
     public void test() {
         LOGGER.info("The test class cannot be empty, otherwise 'No runnable methods exception' will be reported");
     }
+
+    /**
+     * Create InlongGroup from the given specified InlongGroupId
+     *
+     * @return InlongGroupInfo after saving
+     */
+    public InlongGroupInfo createInlongGroup(String inlongGroupId, String mqType) {
+        try {
+            streamService.logicDeleteAll(inlongGroupId, GLOBAL_OPERATOR);
+            groupService.delete(inlongGroupId, GLOBAL_OPERATOR);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        InlongGroupInfo groupInfo;
+        if (MQType.forType(mqType) == MQType.PULSAR || MQType.forType(mqType) == MQType.TDMQ_PULSAR) {
+            groupInfo = new InlongPulsarInfo();
+        } else if (MQType.forType(mqType) == MQType.TUBE) {
+            groupInfo = new InlongPulsarInfo();
+        } else {
+            groupInfo = new InlongNoneMqInfo();
+        }
+
+        groupInfo.setInlongGroupId(inlongGroupId);
+        groupInfo.setMqType(mqType);
+        groupInfo.setMqResource("test-queue");
+        groupInfo.setInCharges(GLOBAL_OPERATOR);
+        groupInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
+        groupService.save(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+        groupService.updateStatus(inlongGroupId, GroupStatus.TO_BE_APPROVAL.getCode(), GLOBAL_OPERATOR);
+        groupService.updateStatus(inlongGroupId, GroupStatus.APPROVE_PASSED.getCode(), GLOBAL_OPERATOR);
+        groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+        return groupInfo;
+    }
+
+    /**
+     * Create InlongStream from the given InlongGroupInfo and specified InlongStreamId
+     *
+     * @return InlongStreamInfo after saving
+     */
+    public InlongStreamInfo createStreamInfo(InlongGroupInfo groupInfo, String inlongStreamId) {
+        String inlongGroupId = groupInfo.getInlongGroupId();
+        // delete first
+        try {
+            streamService.delete(inlongGroupId, inlongStreamId, GLOBAL_OPERATOR);
+        } catch (Exception e) {
+            // ignore
+        }
+
+        InlongStreamRequest request = new InlongStreamRequest();
+        request.setInlongGroupId(inlongGroupId);
+        request.setInlongStreamId(inlongStreamId);
+        request.setMqResource(inlongStreamId);
+        request.setDataSeparator("124");
+        request.setDataEncoding("UTF-8");
+        request.setFieldList(createStreamFields(inlongGroupId, inlongStreamId));
+        streamService.save(request, GLOBAL_OPERATOR);
+
+        return streamService.get(request.getInlongGroupId(), request.getInlongStreamId());
+    }
+
+    /**
+     * Get StreamField list from the given groupId and streamId
+     *
+     * @return list of StreamField
+     */
+    public List<StreamField> createStreamFields(String groupId, String streamId) {
+        final List<StreamField> streamFields = new ArrayList<>();
+        StreamField fieldInfo = new StreamField();
+        fieldInfo.setInlongGroupId(groupId);
+        fieldInfo.setInlongStreamId(streamId);
+        fieldInfo.setFieldName("id");
+        fieldInfo.setFieldType(FieldType.INT.toString());
+        fieldInfo.setFieldComment("idx");
+        streamFields.add(fieldInfo);
+        return streamFields;
+    }
+
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
index fbfad559c..23b6add8b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
@@ -40,69 +40,64 @@ import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
 @EnableAutoConfiguration
 public class InlongGroupProcessOperationTest extends ServiceBaseTest {
 
+    private static final String GROUP_ID = "test_group_process";
     private static final String OPERATOR = "operator";
 
-    private static final String GROUP_NAME = "test_biz";
-
-    private static final String GROUP_ID = "test_biz";
-
     @Autowired
     private InlongGroupService groupService;
-
     @Autowired
     private InlongGroupProcessOperation groupProcessOperation;
-
     @Autowired
     private GroupTaskListenerFactory groupTaskListenerFactory;
 
-    /**
-     * Set some base information before start process.
-     */
-    public void before() {
+    @Test
+    public void testAllProcess() {
+        before();
+        testStartProcess();
+        testSuspendProcess();
+        testRestartProcess();
+
+        // delete the process, will delete the Pulsar resource
+        // TODO Mock the cluster related operate
+        // boolean result = groupProcessOperation.deleteProcess(GROUP_ID, OPERATOR);
+        // Assertions.assertTrue(result);
+    }
+
+    private void before() {
         MockPlugin mockPlugin = new MockPlugin();
         groupTaskListenerFactory.acceptPlugin(mockPlugin);
         InlongPulsarRequest groupInfo = new InlongPulsarRequest();
         groupInfo.setInlongGroupId(GROUP_ID);
-        groupInfo.setName(GROUP_NAME);
         groupInfo.setInCharges(OPERATOR);
         groupInfo.setMqType(MQType.PULSAR.getType());
         groupService.save(groupInfo, OPERATOR);
     }
 
-    // There will be concurrency problems in the overall operation, and the testDeleteProcess() method will call
-    // @Test
-    public void testStartProcess() {
-        before();
+    private void testStartProcess() {
+        InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
+        groupInfo.setInlongClusterTag("default_cluster");
+        groupService.update(groupInfo.genRequest(), OPERATOR);
+
         WorkflowResult result = groupProcessOperation.startProcess(GROUP_ID, OPERATOR);
         ProcessResponse response = result.getProcessInfo();
         Assertions.assertSame(response.getStatus(), ProcessStatus.PROCESSING);
-        InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
+        groupInfo = groupService.get(GROUP_ID);
         Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.TO_BE_APPROVAL.getCode());
     }
 
-    // There will be concurrency problems in the overall operation, and the testDeleteProcess() method will call
-    // @Test
-    public void testSuspendProcess() {
-        testStartProcess();
-        InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
+    private void testSuspendProcess() {
         groupService.updateStatus(GROUP_ID, GroupStatus.APPROVE_PASSED.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
         groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_ING.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
         groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
 
         WorkflowResult result = groupProcessOperation.suspendProcess(GROUP_ID, OPERATOR);
         ProcessResponse response = result.getProcessInfo();
         Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
-        groupInfo = groupService.get(GROUP_ID);
+        InlongGroupInfo groupInfo = groupService.get(GROUP_ID);
         Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.SUSPENDED.getCode());
     }
 
-    // There will be concurrency problems in the overall operation, and the testDeleteProcess() method will call
-    // @Test
-    public void testRestartProcess() {
-        testSuspendProcess();
+    private void testRestartProcess() {
         WorkflowResult result = groupProcessOperation.restartProcess(GROUP_ID, OPERATOR);
         ProcessResponse response = result.getProcessInfo();
         Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
@@ -110,12 +105,5 @@ public class InlongGroupProcessOperationTest extends ServiceBaseTest {
         Assertions.assertEquals(groupInfo.getStatus(), GroupStatus.RESTARTED.getCode());
     }
 
-    @Test
-    public void testDeleteProcess() {
-        testStartProcess();
-        // testRestartProcess();
-        // boolean result = groupProcessOperation.deleteProcess(GROUP_ID, OPERATOR);
-        // Assertions.assertTrue(result);
-    }
 }
 
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
index adc4dd159..e0ed84739 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/sort/DisableZkForSortTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.Lists;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.SinkField;
@@ -122,13 +123,13 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
 
     //    @Test
     public void testCreateSortConfigInUpdateWorkflow() {
-        InlongGroupInfo groupInfo = initGroupForm("PULSAR", "test20");
+        InlongGroupInfo groupInfo = createInlongGroup("test20", MQType.MQ_PULSAR);
         groupInfo.setEnableZookeeper(InlongConstants.ENABLE_ZK);
         groupInfo.setEnableCreateResource(InlongConstants.ENABLE_CREATE_RESOURCE);
         groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
         groupService.update(groupInfo.genRequest(), OPERATOR);
 
-        InlongStreamInfo streamInfo = createStreamInfo(groupInfo);
+        InlongStreamInfo streamInfo = createStreamInfo(groupInfo, "test_stream_info");
         createHiveSink(streamInfo);
         createKafkaSource(streamInfo);
         GroupResourceProcessForm form = new GroupResourceProcessForm();
@@ -141,7 +142,7 @@ public class DisableZkForSortTest extends WorkflowServiceImplTest {
         ProcessResponse response = result.getProcessInfo();
         Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
         WorkflowProcess process = context.getProcess();
-        WorkflowTask task = process.getTaskByName("stopSort");
+        WorkflowTask task = process.getTaskByName("StopSort");
         Assertions.assertTrue(task instanceof ServiceTask);
         Assertions.assertEquals(2, task.getNameToListenerMap().size());
         List<TaskEventListener> listeners = Lists.newArrayList(task.getNameToListenerMap().values());
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
similarity index 60%
rename from inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
rename to inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
index 72f9cc52f..e922b9218 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/DataSourceListenerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/source/listener/StreamSourceListenerTest.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.source.listener;
 
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -28,97 +29,98 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.ServiceBaseTest;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowServiceImplTest;
 import org.apache.inlong.manager.workflow.WorkflowContext;
+import org.apache.inlong.manager.workflow.core.ProcessService;
 import org.apache.inlong.manager.workflow.definition.ServiceTask;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
 import org.apache.inlong.manager.workflow.definition.WorkflowTask;
 import org.apache.inlong.manager.workflow.util.WorkflowBeanUtils;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
- * Test class for operate binlog source, such as frozen or restart.
+ * Test class for operate StreamSource, such as frozen or restart.
  */
-public class DataSourceListenerTest extends WorkflowServiceImplTest {
+public class StreamSourceListenerTest extends ServiceBaseTest {
 
-    public GroupResourceProcessForm form;
+    private static final String GROUP_ID = "test-source-group-id";
+    private static final String STREAM_ID = "test-source-stream-id";
 
-    public InlongGroupInfo groupInfo;
     @Autowired
-    private StreamSourceService streamSourceService;
+    private ProcessService processService;
+    @Autowired
+    private StreamSourceService sourceService;
+
+    private InlongGroupInfo groupInfo;
 
-    @BeforeEach
-    public void init() {
-        subType = "data_source";
+    /**
+     * There will be concurrency problems in the overall operation,This method temporarily fails the test
+     */
+    // @Test
+    public void testAllOperate() {
+        groupInfo = createInlongGroup(GROUP_ID, MQType.MQ_PULSAR);
+        groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_ING.getCode(), GLOBAL_OPERATOR);
+        groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), GLOBAL_OPERATOR);
+        groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+        Integer sourceId = this.createBinlogSource(groupInfo);
+        testFrozenSource(sourceId);
+        testRestartSource(sourceId);
     }
 
-    public Integer createBinlogSource(InlongGroupInfo groupInfo) {
-        final InlongStreamInfo stream = createStreamInfo(groupInfo);
+    private Integer createBinlogSource(InlongGroupInfo groupInfo) {
+        InlongStreamInfo stream = createStreamInfo(groupInfo, STREAM_ID);
         MySQLBinlogSourceRequest sourceRequest = new MySQLBinlogSourceRequest();
         sourceRequest.setInlongGroupId(stream.getInlongGroupId());
         sourceRequest.setInlongStreamId(stream.getInlongStreamId());
         sourceRequest.setSourceName("binlog-collect");
-        return streamSourceService.save(sourceRequest, OPERATOR);
+        return sourceService.save(sourceRequest, GLOBAL_OPERATOR);
     }
 
-    /**
-     * There will be concurrency problems in the overall operation,This method temporarily fails the test
-     */
-    //@Test
-    public void testFrozenSource() {
-        groupInfo = initGroupForm("PULSAR", "test1");
-        groupService.updateStatus(GROUP_ID, GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
-
-        final int sourceId = createBinlogSource(groupInfo);
-        streamSourceService.updateStatus(groupInfo.getInlongGroupId(), null,
-                SourceStatus.SOURCE_NORMAL.getCode(), OPERATOR);
+    private void testFrozenSource(Integer sourceId) {
+        sourceService.updateStatus(GROUP_ID, null, SourceStatus.SOURCE_NORMAL.getCode(), GLOBAL_OPERATOR);
 
-        form = new GroupResourceProcessForm();
+        GroupResourceProcessForm form = new GroupResourceProcessForm();
         form.setGroupInfo(groupInfo);
         form.setGroupOperateType(GroupOperateType.SUSPEND);
-        WorkflowContext context = processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), applicant, form);
+        WorkflowContext context = processService.start(ProcessName.SUSPEND_GROUP_PROCESS.name(), GLOBAL_OPERATOR, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
         Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
 
         WorkflowProcess process = context.getProcess();
-        WorkflowTask task = process.getTaskByName("stopSource");
+        WorkflowTask task = process.getTaskByName("StopSource");
         Assertions.assertTrue(task instanceof ServiceTask);
-        StreamSource streamSource = streamSourceService.get(sourceId);
+
+        StreamSource streamSource = sourceService.get(sourceId);
         Assertions.assertSame(SourceStatus.forCode(streamSource.getStatus()), SourceStatus.TO_BE_ISSUED_FROZEN);
     }
 
-    // @Test
-    public void testRestartSource() {
-        // testFrozenSource();
-        groupInfo = initGroupForm("PULSAR", "test2");
-        groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.CONFIG_SUCCESSFUL.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
-        groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.SUSPENDING.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
-        groupService.updateStatus(groupInfo.getInlongGroupId(), GroupStatus.SUSPENDED.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
-
-        final int sourceId = createBinlogSource(groupInfo);
-        streamSourceService.updateStatus(groupInfo.getInlongGroupId(), null,
-                SourceStatus.SOURCE_NORMAL.getCode(), OPERATOR);
-
-        form = new GroupResourceProcessForm();
+    private void testRestartSource(Integer sourceId) {
+        groupService.updateStatus(GROUP_ID, GroupStatus.SUSPENDING.getCode(), GLOBAL_OPERATOR);
+        groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+        groupService.updateStatus(GROUP_ID, GroupStatus.SUSPENDED.getCode(), GLOBAL_OPERATOR);
+        groupService.update(groupInfo.genRequest(), GLOBAL_OPERATOR);
+
+        sourceService.updateStatus(GROUP_ID, null, SourceStatus.SOURCE_NORMAL.getCode(), GLOBAL_OPERATOR);
+
+        GroupResourceProcessForm form = new GroupResourceProcessForm();
         form.setGroupInfo(groupInfo);
         form.setGroupOperateType(GroupOperateType.RESTART);
-        WorkflowContext context = processService.start(ProcessName.RESTART_GROUP_PROCESS.name(), applicant, form);
+        WorkflowContext context = processService.start(ProcessName.RESTART_GROUP_PROCESS.name(), GLOBAL_OPERATOR, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse response = result.getProcessInfo();
         Assertions.assertSame(response.getStatus(), ProcessStatus.COMPLETED);
 
         WorkflowProcess process = context.getProcess();
-        WorkflowTask task = process.getTaskByName("restartSource");
+        WorkflowTask task = process.getTaskByName("RestartSource");
         Assertions.assertTrue(task instanceof ServiceTask);
+
+        StreamSource streamSource = sourceService.get(sourceId);
+        Assertions.assertSame(SourceStatus.forCode(streamSource.getStatus()), SourceStatus.TO_BE_ISSUED_RETRY);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index 433b95d42..0257456f8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -18,17 +18,9 @@
 package org.apache.inlong.manager.service.workflow;
 
 import com.google.common.collect.Lists;
-import org.apache.inlong.manager.common.enums.FieldType;
-import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
-import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.none.InlongNoneMqInfo;
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
-import org.apache.inlong.manager.common.pojo.stream.StreamField;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.common.pojo.workflow.form.process.GroupResourceProcessForm;
@@ -57,9 +49,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -71,14 +61,9 @@ import static org.mockito.Mockito.when;
 public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     public static final String OPERATOR = "admin";
-
     public static final String GROUP_ID = "test_group";
-
     public static final String STREAM_ID = "test_stream";
-
-    public static final String DATA_ENCODING = "UTF-8";
-
-    protected String subType = "default";
+    private static final String DATA_ENCODING = "UTF-8";
 
     @Autowired
     protected WorkflowServiceImpl workflowService;
@@ -95,90 +80,25 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
     @Autowired
     protected InlongStreamService streamService;
 
-    protected ProcessName processName;
-
     protected String applicant;
-
-    protected GroupResourceProcessForm form;
-
-    protected final AtomicInteger tryTime = new AtomicInteger(0);
+    protected String subType = "default";
+    private ProcessName processName;
+    private GroupResourceProcessForm form;
 
     /**
      * Init inlong group form
      */
-    public InlongGroupInfo initGroupForm(String mqType, String groupId) {
+    public InlongGroupInfo createInlongGroup(String inlongGroupId, String mqType) {
         processName = ProcessName.CREATE_GROUP_RESOURCE;
         applicant = OPERATOR;
-
-        try {
-            streamService.logicDeleteAll(groupId, OPERATOR);
-            groupService.delete(groupId, OPERATOR);
-        } catch (Exception e) {
-            // ignore
-        }
-
-        InlongGroupInfo groupInfo;
-        if (MQType.forType(mqType) == MQType.PULSAR || MQType.forType(mqType) == MQType.TDMQ_PULSAR) {
-            groupInfo = new InlongPulsarInfo();
-        } else if (MQType.forType(mqType) == MQType.TUBE) {
-            groupInfo = new InlongPulsarInfo();
-        } else {
-            groupInfo = new InlongNoneMqInfo();
-        }
-
-        groupInfo.setName(groupId);
-        groupInfo.setInCharges(OPERATOR);
-        groupInfo.setInlongGroupId(groupId);
-        groupInfo.setMqType(mqType);
-        groupInfo.setMqResource("test-queue");
-        groupInfo.setEnableCreateResource(1);
-        groupService.save(groupInfo.genRequest(), OPERATOR);
-
-        groupService.updateStatus(groupId, GroupStatus.TO_BE_APPROVAL.getCode(), OPERATOR);
-        groupService.updateStatus(groupId, GroupStatus.APPROVE_PASSED.getCode(), OPERATOR);
-        groupService.update(groupInfo.genRequest(), OPERATOR);
-
         form = new GroupResourceProcessForm();
+
+        InlongGroupInfo groupInfo = super.createInlongGroup(inlongGroupId, mqType);
         form.setGroupInfo(groupInfo);
-        form.setStreamInfos(Lists.newArrayList(createStreamInfo(groupInfo)));
+        form.setStreamInfos(Lists.newArrayList(super.createStreamInfo(groupInfo, STREAM_ID)));
         return groupInfo;
     }
 
-    /**
-     * Create inlong stream
-     */
-    public InlongStreamInfo createStreamInfo(InlongGroupInfo groupInfo) {
-        // delete first
-        try {
-            streamService.delete(GROUP_ID, OPERATOR, OPERATOR);
-        } catch (Exception e) {
-            // ignore
-        }
-
-        InlongStreamRequest request = new InlongStreamRequest();
-        request.setInlongGroupId(groupInfo.getInlongGroupId());
-        request.setInlongStreamId(STREAM_ID);
-        request.setMqResource(STREAM_ID);
-        request.setDataSeparator("124");
-        request.setDataEncoding(DATA_ENCODING);
-        request.setFieldList(createStreamFields(groupInfo.getInlongGroupId(), STREAM_ID));
-        streamService.save(request, OPERATOR);
-
-        return streamService.get(request.getInlongGroupId(), request.getInlongStreamId());
-    }
-
-    public List<StreamField> createStreamFields(String groupId, String streamId) {
-        final List<StreamField> streamFields = new ArrayList<>();
-        StreamField fieldInfo = new StreamField();
-        fieldInfo.setInlongGroupId(groupId);
-        fieldInfo.setInlongStreamId(streamId);
-        fieldInfo.setFieldName("id");
-        fieldInfo.setFieldType(FieldType.INT.toString());
-        fieldInfo.setFieldComment("idx");
-        streamFields.add(fieldInfo);
-        return streamFields;
-    }
-
     /**
      * Mock the task listener factory
      */
@@ -230,28 +150,24 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
             }
 
             @Override
-            public ListenerResult listen(WorkflowContext context) throws Exception {
-                int tryTimes = tryTime.addAndGet(1);
-                if (tryTimes % 2 == 1) {
-                    throw new WorkflowListenerException();
-                } else {
-                    return ListenerResult.success();
-                }
+            public ListenerResult listen(WorkflowContext context) {
+                return ListenerResult.success();
             }
         };
     }
 
     @Test
-    public void testStartCreatePulsarWorkflow() throws Exception {
-        initGroupForm(MQType.PULSAR.getType(), "test14" + subType);
+    public void testStartCreatePulsarWorkflow() {
+        createInlongGroup("test14" + subType, MQType.MQ_PULSAR);
         mockTaskListenerFactory();
+
         WorkflowContext context = processService.start(processName.name(), applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
         ProcessResponse processResponse = result.getProcessInfo();
-        // This method temporarily fails the test, so comment it out first
-        Assertions.assertSame(processResponse.getStatus(), ProcessStatus.PROCESSING);
+        Assertions.assertSame(processResponse.getStatus(), ProcessStatus.COMPLETED);
+
         WorkflowProcess process = context.getProcess();
-        WorkflowTask task = process.getTaskByName("initMQ");
+        WorkflowTask task = process.getTaskByName("InitMQ");
         Assertions.assertTrue(task instanceof ServiceTask);
         Assertions.assertEquals(2, task.getNameToListenerMap().size());
 
@@ -259,11 +175,11 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
         Assertions.assertTrue(listeners.get(0) instanceof CreatePulsarGroupTaskListener);
         Assertions.assertTrue(listeners.get(1) instanceof CreatePulsarResourceTaskListener);
 
-        Integer processId = processResponse.getId();
-        context = processService.continueProcess(processId, applicant, "continue Process");
-        result = WorkflowBeanUtils.result(context);
-        processResponse = result.getProcessInfo();
-        Assertions.assertSame(processResponse.getStatus(), ProcessStatus.COMPLETED);
+        // Integer processId = processResponse.getId();
+        // context = processService.continueProcess(processId, applicant, "continue process");
+        // result = WorkflowBeanUtils.result(context);
+        // processResponse = result.getProcessInfo();
+        // Assertions.assertSame(processResponse.getStatus(), ProcessStatus.COMPLETED);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
index 26ae370e2..e428fbe1c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/group/CreateGroupWorkflowDefinitionTest.java
@@ -36,12 +36,11 @@ public class CreateGroupWorkflowDefinitionTest extends ServiceBaseTest {
         WorkflowProcess process = createGroupWorkflowDefinition.defineProcess();
         WorkflowProcess cloneProcess1 = process.clone();
         WorkflowProcess cloneProcess2 = cloneProcess1.clone();
-        Assertions.assertTrue(cloneProcess2 != cloneProcess1);
-        Assertions.assertEquals("Group Resource Creation", process.getType());
-        Assertions.assertNotNull(process.getTaskByName("initSource"));
-        Assertions.assertNotNull(process.getTaskByName("initMQ"));
-        Assertions.assertNotNull(process.getTaskByName("initSort"));
-        Assertions.assertNotNull(process.getTaskByName("initSink"));
+        Assertions.assertNotSame(cloneProcess2, cloneProcess1);
+        Assertions.assertNotNull(process.getTaskByName("InitSource"));
+        Assertions.assertNotNull(process.getTaskByName("InitMQ"));
+        Assertions.assertNotNull(process.getTaskByName("InitSort"));
+        Assertions.assertNotNull(process.getTaskByName("InitSink"));
         Assertions.assertEquals(4, process.getNameToTaskMap().size());
     }
 
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 64eb3ff03..d19693e92 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -615,9 +615,9 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
 -- create default approver for new consumption and new inlong group
 INSERT INTO `workflow_approver`(`process_name`, `task_name`, `filter_key`, `filter_value`, `approvers`,
                                 `creator`, `modifier`, `create_time`, `modify_time`, `is_deleted`)
-VALUES ('NEW_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
         'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0),
-       ('NEW_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+       ('APPLY_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
         'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0);
 
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 536263d79..d90678dda 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -652,9 +652,9 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
 -- create default approver for new consumption and new inlong group
 INSERT INTO `workflow_approver`(`process_name`, `task_name`, `filter_key`, `filter_value`, `approvers`,
                                 `creator`, `modifier`, `create_time`, `modify_time`, `is_deleted`)
-VALUES ('NEW_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
         'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0),
-       ('NEW_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
+       ('APPLY_GROUP_PROCESS', 'ut_admin', 'DEFAULT', NULL, 'admin',
         'inlong_init', 'inlong_init', CURRENT_TIMESTAMP, CURRENT_TIMESTAMP, 0);
 
 -- ----------------------------