You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/25 03:23:48 UTC

[inlong] branch master updated: [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process (#5985)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f84cfb23 [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process (#5985)
4f84cfb23 is described below

commit 4f84cfb23c00d02d6a8e72850d7ecf3106200812
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Sun Sep 25 11:23:44 2022 +0800

    [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process (#5985)
---
 .../inlong/manager/common/enums/ConsumeStatus.java | 114 +++++++++++++--------
 .../inlong/manager/common/enums/ProcessName.java   |   4 +-
 .../dao/mapper/InlongConsumeEntityMapper.java      |   2 +
 .../mappers/InlongConsumeEntityMapper.xml          |  11 ++
 ...ocessForm.java => ApplyConsumeProcessForm.java} |  18 ++--
 .../workflow/form/process/BaseProcessForm.java     |   2 +-
 .../pojo/workflow/form/task/BaseTaskForm.java      |   2 +-
 ...ionApproveForm.java => ConsumeApproveForm.java} |   6 +-
 .../service/consume/AbstractConsumeOperator.java   |   2 +-
 .../consume/InlongConsumeProcessService.java       |  41 ++------
 .../service/consume/InlongConsumeService.java      |  10 ++
 .../service/consume/InlongConsumeServiceImpl.java  |  41 ++++++--
 .../service/core/impl/ConsumptionServiceImpl.java  |  14 +--
 .../service/group/InlongPulsarOperator.java        |  20 ++--
 .../OperateConsumeTaskListener.java}               |  33 +++---
 .../apply/ApproveConsumeProcessListener.java}      |  47 +++++----
 .../apply/CancelConsumeProcessListener.java}       |  30 +++---
 .../apply/RejectConsumeProcessListener.java}       |  36 +++----
 .../ApplyConsumeProcessHandler.java}               |  10 +-
 .../ApplyConsumeWorkflowDefinition.java}           |  53 +++++-----
 .../main/resources/h2/apache_inlong_manager.sql    |   6 +-
 .../manager-web/sql/apache_inlong_manager.sql      |   6 +-
 .../web/controller/InlongConsumeController.java    |   4 +-
 23 files changed, 283 insertions(+), 229 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
index 0b9013df2..065b225f0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
@@ -17,71 +17,101 @@
 
 package org.apache.inlong.manager.common.enums;
 
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import org.apache.inlong.manager.common.util.InlongCollectionUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 
 import java.util.Map;
 import java.util.Set;
-import java.util.function.Function;
 
 /**
  * Inlong consume status
  */
-@ApiModel("Inlong consume status")
 public enum ConsumeStatus {
 
-    @ApiModelProperty(value = "To be allocated: 10")
-    WAIT_ASSIGN(10),
+    TO_BE_SUBMIT(100, "waiting for submit"),
+    TO_BE_APPROVAL(101, "waiting for approval"),
 
-    @ApiModelProperty(value = "Pending approval: 11")
-    WAIT_APPROVE(11),
+    APPROVE_REJECTED(102, "approval rejected"),
+    APPROVE_PASSED(103, "approval passed"),
+    APPROVE_CANCELED(104, "approval canceled"),
 
-    @ApiModelProperty(value = "Approval rejected: 20")
-    REJECTED(20),
-
-    @ApiModelProperty(value = "Approval and approval: 21")
-    APPROVED(21),
-
-    @ApiModelProperty(value = "Cancel application: 22")
-    CANCELED(22),
-
-    @ApiModelProperty(value = "Deleting: 41")
-    DELETING(41),
-
-    @ApiModelProperty(value = "Deleted: 40")
-    DELETED(40),
+    DELETING(41, "deleting"),
+    DELETED(40, "deleted"),
 
     ;
 
-    public static final Set<ConsumeStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet
-            .of(WAIT_ASSIGN, REJECTED, CANCELED);
+    /**
+     * State automaton for InlongConsume
+     */
+    private static final Map<ConsumeStatus, Set<ConsumeStatus>> CONSUME_STATE_AUTOMATON = Maps.newHashMap();
+
+    /*
+     * Init consume finite status automaton
+     */
+    static {
+        CONSUME_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING));
+        CONSUME_STATE_AUTOMATON.put(TO_BE_APPROVAL,
+                Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED, APPROVE_CANCELED, DELETING));
+
+        CONSUME_STATE_AUTOMATON.put(APPROVE_REJECTED, Sets.newHashSet(APPROVE_REJECTED, TO_BE_APPROVAL, DELETING));
+        CONSUME_STATE_AUTOMATON.put(APPROVE_CANCELED, Sets.newHashSet(APPROVE_CANCELED, TO_BE_APPROVAL, DELETING));
+        CONSUME_STATE_AUTOMATON.put(APPROVE_PASSED, Sets.newHashSet(APPROVE_PASSED, TO_BE_APPROVAL, DELETING));
+
+        CONSUME_STATE_AUTOMATON.put(DELETING, Sets.newHashSet(DELETING, DELETED));
+        CONSUME_STATE_AUTOMATON.put(DELETED, Sets.newHashSet(DELETED));
+    }
 
-    public static final Set<ConsumeStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN);
+    private final Integer code;
+    private final String description;
 
-    private static final Map<Integer, ConsumeStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap(
-            Lists.newArrayList(ConsumeStatus.values()),
-            ConsumeStatus::getCode,
-            Function.identity()
-    );
+    ConsumeStatus(Integer code, String description) {
+        this.code = code;
+        this.description = description;
+    }
 
-    private final int code;
+    /**
+     * Get the ConsumeStatus instance from the given code
+     *
+     * @param code status code
+     * @return instance of ConsumeStatus
+     */
+    public static ConsumeStatus forCode(int code) {
+        for (ConsumeStatus status : values()) {
+            if (status.getCode() == code) {
+                return status;
+            }
+        }
+        throw new BusinessException(String.format("Illegal code=%s for ConsumeStatus", code));
+    }
 
-    ConsumeStatus(int code) {
-        this.code = code;
+    /**
+     * Check whether the current status can be transferred to the next status
+     *
+     * @param cur current status
+     * @param next next status
+     * @return true if transferred, false if not
+     */
+    public static boolean notAllowedTransfer(ConsumeStatus cur, ConsumeStatus next) {
+        Set<ConsumeStatus> nextSet = CONSUME_STATE_AUTOMATON.get(cur);
+        return nextSet == null || !nextSet.contains(next);
     }
 
-    public static ConsumeStatus fromStatus(int status) {
-        ConsumeStatus consumeStatus = STATUS_MAP.get(status);
-        Preconditions.checkNotNull(consumeStatus, "consume status is invalid for " + status);
-        return consumeStatus;
+    /**
+     * Checks whether the given status allows the update.
+     */
+    public static boolean allowedUpdate(ConsumeStatus status) {
+        return status == ConsumeStatus.TO_BE_SUBMIT
+                || status == ConsumeStatus.APPROVE_REJECTED
+                || status == ConsumeStatus.APPROVE_CANCELED;
     }
 
-    public int getCode() {
+    public Integer getCode() {
         return code;
     }
 
+    public String getDescription() {
+        return description;
+    }
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
index bf604f2ef..997226594 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
@@ -48,9 +48,9 @@ public enum ProcessName {
     DELETE_GROUP_PROCESS("Delete-Group"),
 
     /**
-     * Apply consumption process
+     * Apply inlong consume process
      */
-    APPLY_CONSUMPTION_PROCESS("Apply-Consumption"),
+    APPLY_CONSUME_PROCESS("Apply-Consume"),
 
     /**
      * Create inlong stream process
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 1739c2c8c..6b0db6617 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -43,6 +43,8 @@ public interface InlongConsumeEntityMapper {
 
     int updateByIdSelective(InlongConsumeEntity record);
 
+    void updateStatus(@Param("id") Integer id, @Param("status") Integer status, @Param("modifier") String modifier);
+
     int deleteById(Integer id);
 
 }
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
index d17dc3b87..c38c53028 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -189,6 +189,9 @@
             <if test="status != null">
                 status = #{status, jdbcType=INTEGER},
             </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
             <if test="modifier != null">
                 modifier = #{modifier, jdbcType=VARCHAR},
             </if>
@@ -198,6 +201,14 @@
         and is_deleted = 0
         and version = #{version, jdbcType=INTEGER}
     </update>
+    <update id="updateStatus">
+        update inlong_consume
+        set previous_status = status,
+            status          = #{status, jdbcType=INTEGER},
+            modifier        = #{modifier, jdbcType=VARCHAR}
+        where id = #{id,jdbcType=INTEGER}
+          and is_deleted = 0
+    </update>
 
     <delete id="deleteById" parameterType="java.lang.Integer">
         delete
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumptionProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
similarity index 73%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
index 584118848..85d657c45 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
@@ -22,8 +22,8 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.inlong.manager.common.exceptions.FormValidateException;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 
 import java.util.Map;
 
@@ -32,16 +32,16 @@ import java.util.Map;
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
-public class ApplyConsumptionProcessForm extends BaseProcessForm {
+public class ApplyConsumeProcessForm extends BaseProcessForm {
 
-    public static final String FORM_NAME = "ApplyConsumptionProcessForm";
+    public static final String FORM_NAME = "ApplyConsumeProcessForm";
 
-    @ApiModelProperty(value = "Data consumption information")
-    private ConsumptionInfo consumptionInfo;
+    @ApiModelProperty(value = "Inlong consume info")
+    private InlongConsumeInfo consumeInfo;
 
     @Override
     public void validate() throws FormValidateException {
-        Preconditions.checkNotNull(consumptionInfo, "Data consumption information cannot be empty");
+        Preconditions.checkNotNull(consumeInfo, "Inlong consume cannot be empty");
     }
 
     @Override
@@ -51,14 +51,14 @@ public class ApplyConsumptionProcessForm extends BaseProcessForm {
 
     @Override
     public String getInlongGroupId() {
-        return consumptionInfo.getConsumerGroup();
+        return consumeInfo.getInlongGroupId();
     }
 
     @Override
     public Map<String, Object> showInList() {
         Map<String, Object> show = Maps.newHashMap();
-        if (consumptionInfo != null) {
-            show.put("inlongGroupId", consumptionInfo.getInlongGroupId());
+        if (consumeInfo != null) {
+            show.put("inlongGroupId", consumeInfo.getInlongGroupId());
         }
         return show;
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java
index 6cbbbbac2..810a203f8 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java
@@ -29,7 +29,7 @@ import lombok.Data;
 @JsonTypeInfo(use = Id.NAME, property = "formName")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = ApplyGroupProcessForm.class, name = ApplyGroupProcessForm.FORM_NAME),
-        @JsonSubTypes.Type(value = ApplyConsumptionProcessForm.class, name = ApplyConsumptionProcessForm.FORM_NAME),
+        @JsonSubTypes.Type(value = ApplyConsumeProcessForm.class, name = ApplyConsumeProcessForm.FORM_NAME),
         @JsonSubTypes.Type(value = GroupResourceProcessForm.class, name = GroupResourceProcessForm.FORM_NAME),
         @JsonSubTypes.Type(value = StreamResourceProcessForm.class, name = StreamResourceProcessForm.FORM_NAME),
 })
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java
index fe209d1cd..c35e2cd7e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java
@@ -29,7 +29,7 @@ import lombok.Data;
 @JsonTypeInfo(use = Id.NAME, property = "formName")
 @JsonSubTypes({
         @JsonSubTypes.Type(value = InlongGroupApproveForm.class, name = InlongGroupApproveForm.FORM_NAME),
-        @JsonSubTypes.Type(value = ConsumptionApproveForm.class, name = ConsumptionApproveForm.FORM_NAME),
+        @JsonSubTypes.Type(value = ConsumeApproveForm.class, name = ConsumeApproveForm.FORM_NAME),
         @JsonSubTypes.Type(value = ServiceTaskForm.class, name = ServiceTaskForm.FORM_NAME),
 })
 public abstract class BaseTaskForm implements TaskForm {
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumptionApproveForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumeApproveForm.java
similarity index 89%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumptionApproveForm.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumeApproveForm.java
index ccbcec9d4..ef2c0c952 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumptionApproveForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumeApproveForm.java
@@ -24,13 +24,13 @@ import org.apache.inlong.manager.common.exceptions.FormValidateException;
 import org.apache.inlong.manager.common.util.Preconditions;
 
 /**
- * The approval form of the consumption
+ * The approval form of the inlong consume
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
-public class ConsumptionApproveForm extends BaseTaskForm {
+public class ConsumeApproveForm extends BaseTaskForm {
 
-    public static final String FORM_NAME = "ConsumptionApproveForm";
+    public static final String FORM_NAME = "ConsumeApproveForm";
 
     @ApiModelProperty("Consumer group")
     private String consumerGroup;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
index e9f886909..8840701bc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
@@ -50,7 +50,7 @@ public abstract class AbstractConsumeOperator implements InlongConsumeOperator {
         // set the ext params, init status, and other info
         InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new);
         this.setTargetEntity(request, entity);
-        entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
+        entity.setStatus(ConsumeStatus.TO_BE_SUBMIT.getCode());
         entity.setCreator(operator);
         entity.setModifier(operator);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
index d4be887e4..ddd942dc4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
@@ -18,18 +18,10 @@
 package org.apache.inlong.manager.service.consume;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -42,11 +34,9 @@ import org.springframework.stereotype.Service;
 public class InlongConsumeProcessService {
 
     @Autowired
-    private ConsumptionService consumptionService;
+    private InlongConsumeService consumeService;
     @Autowired
     private WorkflowService workflowService;
-    @Autowired
-    private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
 
     /**
      * Start the process for the specified ID.
@@ -56,30 +46,13 @@ public class InlongConsumeProcessService {
      * @return workflow result
      */
     public WorkflowResult startProcess(Integer id, String operator) {
-        ConsumptionInfo consumptionInfo = consumptionService.get(id);
-        Preconditions.checkTrue(ConsumeStatus.ALLOW_START_WORKFLOW_STATUS.contains(
-                        ConsumeStatus.fromStatus(consumptionInfo.getStatus())),
-                "current status not allowed to start workflow");
-
-        consumptionInfo.setStatus(ConsumeStatus.WAIT_APPROVE.getCode());
-        boolean rowCount = consumptionService.update(consumptionInfo, operator);
-        Preconditions.checkTrue(rowCount, "update consumption failed");
-
-        return workflowService.start(ProcessName.APPLY_CONSUMPTION_PROCESS, operator,
-                genConsumptionProcessForm(consumptionInfo));
+        consumeService.updateStatus(id, ConsumeStatus.TO_BE_APPROVAL.getCode(), operator);
+        return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, operator, genApplyConsumeProcessForm(id));
     }
 
-    private ApplyConsumptionProcessForm genConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
-        ApplyConsumptionProcessForm form = new ApplyConsumptionProcessForm();
-        Integer id = consumptionInfo.getId();
-        String mqType = consumptionInfo.getMqType();
-        if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-            ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
-            ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
-                    ConsumptionPulsarInfo::new);
-            consumptionInfo.setMqExtInfo(pulsarInfo);
-        }
-        form.setConsumptionInfo(consumptionInfo);
+    private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id) {
+        ApplyConsumeProcessForm form = new ApplyConsumeProcessForm();
+        form.setConsumeInfo(consumeService.get(id));
         return form;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
index 41cbe89ed..cb295cb22 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -92,6 +92,16 @@ public interface InlongConsumeService {
     Integer update(@Valid @NotNull(message = "inlong consume request cannot be null") InlongConsumeRequest request,
             String operator);
 
+    /**
+     * Update the inlong consume status to the specified status
+     *
+     * @param id inlong consume id
+     * @param status modified status
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean updateStatus(Integer id, Integer status, String operator);
+
     /**
      * Delete the inlong consume by the id
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index 43d73ea56..51b2868f5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -104,7 +104,7 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
         entity.setFilterEnabled(0);
 
         entity.setInCharges(groupInfo.getInCharges());
-        entity.setStatus(ConsumeStatus.APPROVED.getCode());
+        entity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
         String operator = groupInfo.getCreator();
         entity.setCreator(operator);
         entity.setModifier(operator);
@@ -153,11 +153,11 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
             int status = Integer.parseInt(countInfo.getKey());
             int count = countInfo.getValue();
             result.setTotalCount(result.getTotalCount() + count);
-            if (status == ConsumeStatus.WAIT_ASSIGN.getCode()) {
+            if (status == ConsumeStatus.TO_BE_SUBMIT.getCode()) {
                 result.setWaitAssignCount(result.getWaitAssignCount() + count);
-            } else if (status == ConsumeStatus.WAIT_APPROVE.getCode()) {
+            } else if (status == ConsumeStatus.TO_BE_APPROVAL.getCode()) {
                 result.setWaitApproveCount(result.getWaitApproveCount() + count);
-            } else if (status == ConsumeStatus.REJECTED.getCode()) {
+            } else if (status == ConsumeStatus.APPROVE_REJECTED.getCode()) {
                 result.setRejectCount(result.getRejectCount() + count);
             }
         }
@@ -186,7 +186,8 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
     }
 
     @Override
-    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
+    @Transactional(rollbackFor = Throwable.class,
+            isolation = Isolation.REPEATABLE_READ,
             propagation = Propagation.REQUIRES_NEW)
     public Integer update(InlongConsumeRequest request, String operator) {
         LOGGER.debug("begin to update inlong consume={} by user={}", request, operator);
@@ -205,8 +206,8 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
 
-        ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(existEntity.getStatus());
-        Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+        ConsumeStatus consumeStatus = ConsumeStatus.forCode(existEntity.getStatus());
+        Preconditions.checkTrue(ConsumeStatus.allowedUpdate(consumeStatus),
                 "inlong consume not allowed update when status is " + consumeStatus.name());
 
         InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType());
@@ -216,6 +217,32 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
         return consumeId;
     }
 
+    @Override
+    @Transactional(rollbackFor = Throwable.class,
+            isolation = Isolation.REPEATABLE_READ,
+            propagation = Propagation.REQUIRES_NEW)
+    public Boolean updateStatus(Integer id, Integer status, String operator) {
+        LOGGER.info("begin to update consume status to [{}] for id={} by user={}", status, id, operator);
+        Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
+        InlongConsumeEntity entity = consumeMapper.selectById(id);
+        if (entity == null) {
+            LOGGER.error("inlong consume not found by id={}", id);
+            throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
+        }
+
+        ConsumeStatus curStatus = ConsumeStatus.forCode(entity.getStatus());
+        ConsumeStatus nextStatus = ConsumeStatus.forCode(status);
+        if (ConsumeStatus.notAllowedTransfer(curStatus, nextStatus)) {
+            String errorMsg = String.format("Current status=%s cannot transfer to status=%s", curStatus, nextStatus);
+            LOGGER.error(errorMsg);
+            throw new BusinessException(errorMsg);
+        }
+
+        consumeMapper.updateStatus(id, status, operator);
+        LOGGER.info("success to update consume status to [{}] for id={} by user={}", status, id, operator);
+        return true;
+    }
+
     @Override
     public Boolean delete(Integer id, String operator) {
         LOGGER.info("begin to delete inlong consume for id={} by user={}", id, operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 0de16b5c6..469309c05 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -103,9 +103,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
         return ConsumptionSummary.builder()
                 .totalCount(countMap.values().stream().mapToInt(c -> c).sum())
-                .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.WAIT_ASSIGN.getCode() + "", 0))
-                .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.WAIT_APPROVE.getCode() + "", 0))
-                .rejectedCount(countMap.getOrDefault(ConsumeStatus.REJECTED.getCode() + "", 0)).build();
+                .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.TO_BE_SUBMIT.getCode() + "", 0))
+                .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.TO_BE_APPROVAL.getCode() + "", 0))
+                .rejectedCount(countMap.getOrDefault(ConsumeStatus.APPROVE_REJECTED.getCode() + "", 0)).build();
     }
 
     @Override
@@ -165,8 +165,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         if (info.getId() != null) {
             ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
             Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
-            ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(consumptionEntity.getStatus());
-            Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+            ConsumeStatus consumeStatus = ConsumeStatus.forCode(consumptionEntity.getStatus());
+            Preconditions.checkTrue(ConsumeStatus.allowedUpdate(consumeStatus),
                     "consumption not allow update when status is " + consumeStatus.name());
         }
 
@@ -283,7 +283,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
             // If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
             // add/remove inlong streams in the inlong group
-            if (ConsumeStatus.APPROVED.getCode() == exists.getStatus()) {
+            if (ConsumeStatus.APPROVE_PASSED.getCode() == exists.getStatus()) {
                 String groupId = info.getInlongGroupId();
                 String dlqNameOld = pulsarEntity.getDeadLetterTopic();
                 String dlqNameNew = update.getDeadLetterTopic();
@@ -337,7 +337,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
     private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
         ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
-        entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
+        entity.setStatus(ConsumeStatus.TO_BE_SUBMIT.getCode());
         entity.setCreator(operator);
         entity.setModifier(operator);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
index b9c98faad..d00fe64b4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
@@ -31,11 +31,15 @@ import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.List;
+
 /**
  * Inlong group operator for Pulsar.
  */
@@ -46,6 +50,8 @@ public class InlongPulsarOperator extends AbstractGroupOperator {
 
     @Autowired
     private ObjectMapper objectMapper;
+    @Autowired
+    private InlongStreamService streamService;
 
     @Override
     public Boolean accept(String mqType) {
@@ -107,13 +113,13 @@ public class InlongPulsarOperator extends AbstractGroupOperator {
     @Override
     public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
         InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
-        // TODO add cache for cluster info
-        // pulsar topic corresponds to the inlong stream one-to-one
-        // topicInfo.setDsTopicList(streamService.getTopicList(groupInfo.getInlongGroupId()));
-        // commonOperateService.getSpecifiedParam(InlongConstants.TUBE_MASTER_URL);
-        // groupInfo.setTenant();
-        // groupInfo.setAdminUrl();
-        // groupInfo.setServiceUrl();
+        // Pulsar topic corresponds to the inlong stream one-to-one
+        List<InlongStreamBriefInfo> streamTopics = streamService.getTopicList(groupInfo.getInlongGroupId());
+        topicInfo.setStreamTopics(streamTopics);
+        // TODO add cache for cluster info, and support extends different MQs
+        // topicInfo.setTenant();
+        // topicInfo.setAdminUrl();
+        // topicInfo.setServiceUrl();
         return topicInfo;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionPassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/OperateConsumeTaskListener.java
similarity index 61%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionPassTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/OperateConsumeTaskListener.java
index 924b90352..6a4146a3f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionPassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/OperateConsumeTaskListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume;
 
 import com.alibaba.druid.util.StringUtils;
 import lombok.extern.slf4j.Slf4j;
@@ -23,10 +23,10 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.TaskEvent;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.task.ConsumptionApproveForm;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.task.ConsumeApproveForm;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
@@ -34,14 +34,14 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * New data consumption-system administrator approval task event listener
+ * New inlong consume approval task event listener
  */
 @Slf4j
 @Component
-public class ConsumptionPassTaskListener implements TaskEventListener {
+public class OperateConsumeTaskListener implements TaskEventListener {
 
     @Autowired
-    private ConsumptionService consumptionService;
+    private InlongConsumeService consumeService;
 
     @Override
     public TaskEvent event() {
@@ -50,19 +50,20 @@ public class ConsumptionPassTaskListener implements TaskEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
-        ConsumptionApproveForm approveForm = (ConsumptionApproveForm) context.getActionContext().getForm();
-        ConsumptionInfo info = form.getConsumptionInfo();
-        if (StringUtils.equals(approveForm.getConsumerGroup(), info.getConsumerGroup())) {
-            return ListenerResult.success("The consumer group has not been modified");
+        ApplyConsumeProcessForm consumeForm = (ApplyConsumeProcessForm) context.getProcessForm();
+        ConsumeApproveForm approveForm = (ConsumeApproveForm) context.getActionContext().getForm();
+        InlongConsumeInfo consumeInfo = consumeForm.getConsumeInfo();
+        if (StringUtils.equals(approveForm.getConsumerGroup(), consumeInfo.getConsumerGroup())) {
+            return ListenerResult.success("Consumer group has not been modified");
         }
-        boolean exist = consumptionService.isConsumerGroupExists(approveForm.getConsumerGroup(), info.getId());
+
+        boolean exist = consumeService.consumerGroupExists(approveForm.getConsumerGroup(), consumeInfo.getId());
         if (exist) {
             log.error("consumer group {} already exist", approveForm.getConsumerGroup());
             throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_DUPLICATED);
         }
-        return ListenerResult.success("Consumer group from " + info.getConsumerGroup()
-                + " change to " + approveForm.getConsumerGroup());
+        return ListenerResult.success(String.format("Consumer group %s change to %s",
+                consumeInfo.getConsumerGroup(), approveForm.getConsumerGroup()));
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
similarity index 82%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index 61de6bb5d..07d9ed0b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume.apply;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -28,15 +28,15 @@ import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
@@ -52,16 +52,16 @@ import java.util.Arrays;
 import java.util.List;
 
 /**
- * Added data consumption process complete archive event listener
+ * Inlong consume process complete archive event listener
  */
 @Slf4j
 @Component
-public class ConsumptionCompleteProcessListener implements ProcessEventListener {
+public class ApproveConsumeProcessListener implements ProcessEventListener {
 
     @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
-    private ConsumptionEntityMapper consumptionMapper;
+    private InlongConsumeEntityMapper consumeMapper;
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
@@ -76,13 +76,12 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        ApplyConsumptionProcessForm consumptionForm = (ApplyConsumptionProcessForm) context.getProcessForm();
-
-        // Real-time query of consumption information
-        Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
-        ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(consumptionId);
+        // query of inlong consume info from DB
+        ApplyConsumeProcessForm consumeForm = (ApplyConsumeProcessForm) context.getProcessForm();
+        Integer consumeId = consumeForm.getConsumeInfo().getId();
+        InlongConsumeEntity entity = consumeMapper.selectById(consumeId);
         if (entity == null) {
-            throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
+            throw new WorkflowListenerException("inlong consume not exits for id=" + consumeId);
         }
 
         String mqType = entity.getMqType();
@@ -92,13 +91,13 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
         } else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
             this.createPulsarSubscription(entity);
         } else if (MQType.KAFKA.equals(mqType)) {
-            //TODO add kakfa
-
+            // TODO add Kafka
         } else {
             throw new WorkflowListenerException("Unsupported MQ type " + mqType);
         }
 
-        this.updateConsumerInfo(consumptionId, entity.getConsumerGroup());
+        // the consumer group may be changed when approving
+        this.updateConsumerInfo(consumeId, entity.getConsumerGroup());
         return ListenerResult.success("Create MQ consumer group successful");
     }
 
@@ -106,13 +105,13 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
      * Update consumption after approve
      */
     private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
-        ConsumptionEntity update = consumptionMapper.selectByPrimaryKey(consumptionId);
-        update.setStatus(ConsumeStatus.APPROVED.getCode());
-        update.setConsumerGroup(consumerGroup);
-        int rowCount = consumptionMapper.updateByPrimaryKeySelective(update);
+        InlongConsumeEntity existEntity = consumeMapper.selectById(consumptionId);
+        existEntity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
+        existEntity.setConsumerGroup(consumerGroup);
+        int rowCount = consumeMapper.updateByIdSelective(existEntity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            log.error("consumption information has already updated, id={}, curVersion={}",
-                    update.getId(), update.getVersion());
+            log.error("inlong consume has already updated, id={}, curVersion={}",
+                    existEntity.getId(), existEntity.getVersion());
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
     }
@@ -120,7 +119,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
     /**
      * Create Pulsar subscription
      */
-    private void createPulsarSubscription(ConsumptionEntity entity) {
+    private void createPulsarSubscription(InlongConsumeEntity entity) {
         String groupId = entity.getInlongGroupId();
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
         Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
@@ -162,7 +161,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
     /**
      * Create TubeMQ consumer group
      */
-    private void createTubeConsumerGroup(ConsumptionEntity entity, String operator) {
+    private void createTubeConsumerGroup(InlongConsumeEntity entity, String operator) {
         String groupId = entity.getInlongGroupId();
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
         Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/CancelConsumeProcessListener.java
similarity index 65%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/CancelConsumeProcessListener.java
index 4d1c09e6c..3a8cd8c62 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/CancelConsumeProcessListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume.apply;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -24,9 +24,9 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
@@ -34,18 +34,18 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Added data consumption process cancellation event listener
+ * Inlong consume process cancellation event listener
  */
 @Slf4j
 @Component
-public class ConsumptionCancelProcessListener implements ProcessEventListener {
+public class CancelConsumeProcessListener implements ProcessEventListener {
 
     @Autowired
-    private ConsumptionEntityMapper consumptionEntityMapper;
+    private InlongConsumeEntityMapper consumeMapper;
 
     @Autowired
-    public ConsumptionCancelProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
-        this.consumptionEntityMapper = consumptionEntityMapper;
+    public CancelConsumeProcessListener(InlongConsumeEntityMapper consumeMapper) {
+        this.consumeMapper = consumeMapper;
     }
 
     @Override
@@ -55,14 +55,14 @@ public class ConsumptionCancelProcessListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
+        ApplyConsumeProcessForm processForm = (ApplyConsumeProcessForm) context.getProcessForm();
 
-        ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
-        update.setStatus(ConsumeStatus.CANCELED.getCode());
-        int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
+        InlongConsumeEntity consumeEntity = consumeMapper.selectById(processForm.getConsumeInfo().getId());
+        consumeEntity.setStatus(ConsumeStatus.APPROVE_CANCELED.getCode());
+        int rowCount = consumeMapper.updateByIdSelective(consumeEntity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            log.error("consumption information has already updated, id={}, curVersion={}",
-                    update.getId(), update.getVersion());
+            log.error("inlong consume has already updated, id={}, curVersion={}",
+                    consumeEntity.getId(), consumeEntity.getVersion());
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
         return ListenerResult.success("Application process is cancelled");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/RejectConsumeProcessListener.java
similarity index 63%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/RejectConsumeProcessListener.java
index 0161cb4e9..cf96224bc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/RejectConsumeProcessListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume.apply;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ConsumeStatus;
@@ -23,9 +23,9 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.ProcessEvent;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
@@ -35,19 +35,19 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Added data consumption process rejection event listener
+ * Inlong consume process rejection event listener
  */
 @Component
-public class ConsumptionRejectProcessListener implements ProcessEventListener {
+public class RejectConsumeProcessListener implements ProcessEventListener {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumptionRejectProcessListener.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(RejectConsumeProcessListener.class);
 
     @Autowired
-    private ConsumptionEntityMapper consumptionEntityMapper;
+    private InlongConsumeEntityMapper consumeMapper;
 
     @Autowired
-    public ConsumptionRejectProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
-        this.consumptionEntityMapper = consumptionEntityMapper;
+    public RejectConsumeProcessListener(InlongConsumeEntityMapper consumeMapper) {
+        this.consumeMapper = consumeMapper;
     }
 
     @Override
@@ -57,17 +57,17 @@ public class ConsumptionRejectProcessListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
+        ApplyConsumeProcessForm processForm = (ApplyConsumeProcessForm) context.getProcessForm();
+        InlongConsumeEntity consumeEntity = consumeMapper.selectById(processForm.getConsumeInfo().getId());
+        consumeEntity.setStatus(ConsumeStatus.APPROVE_REJECTED.getCode());
 
-        ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
-        update.setStatus(ConsumeStatus.REJECTED.getCode());
-
-        int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
+        int rowCount = consumeMapper.updateByIdSelective(consumeEntity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error("consumption information has already updated, id={}, curVersion={}",
-                    update.getId(), update.getVersion());
+            LOGGER.error("inlong consume has already updated, id={}, curVersion={}",
+                    consumeEntity.getId(), consumeEntity.getVersion());
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
-        return ListenerResult.success("The application process was rejected");
+
+        return ListenerResult.success("The consume application was rejected");
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeProcessHandler.java
similarity index 87%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeProcessHandler.java
index 7579b5a85..d5e1d63d7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeProcessHandler.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.consumption;
+package org.apache.inlong.manager.service.workflow.consume;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.manager.pojo.workflow.ProcessDetailResponse;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
 import org.apache.inlong.manager.workflow.definition.ProcessDetailHandler;
 import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
@@ -28,10 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Apply consumption process handler
+ * Apply inlong consume process handler
  */
 @Component
-public class ApplyConsumptionProcessHandler implements ProcessDetailHandler {
+public class ApplyConsumeProcessHandler implements ProcessDetailHandler {
 
     @Autowired
     private ObjectMapper objectMapper;
@@ -41,7 +41,7 @@ public class ApplyConsumptionProcessHandler implements ProcessDetailHandler {
     @Override
     public ProcessDetailResponse handle(ProcessDetailResponse processResponse) {
         WorkflowProcess process = processDefinitionService.getByName(processResponse.getWorkflow().getName());
-        ApplyConsumptionProcessForm processForm = WorkflowUtils.parseProcessForm(objectMapper,
+        ApplyConsumeProcessForm processForm = WorkflowUtils.parseProcessForm(objectMapper,
                 processResponse.getProcessInfo().getFormData().toString(), process);
         if (processForm == null) {
             return processResponse;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeWorkflowDefinition.java
similarity index 71%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeWorkflowDefinition.java
index a5fa8a2d2..1dc368e68 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeWorkflowDefinition.java
@@ -15,20 +15,20 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.consumption;
+package org.apache.inlong.manager.service.workflow.consume;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.task.ConsumptionApproveForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.task.ConsumeApproveForm;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.service.listener.consume.OperateConsumeTaskListener;
+import org.apache.inlong.manager.service.listener.consume.apply.ApproveConsumeProcessListener;
+import org.apache.inlong.manager.service.listener.consume.apply.CancelConsumeProcessListener;
+import org.apache.inlong.manager.service.listener.consume.apply.RejectConsumeProcessListener;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionCancelProcessListener;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionCompleteProcessListener;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionPassTaskListener;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionRejectProcessListener;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.definition.EndEvent;
 import org.apache.inlong.manager.workflow.definition.StartEvent;
@@ -42,29 +42,24 @@ import java.util.Collections;
 import java.util.List;
 
 /**
- * New data consumption workflow definition
+ * New inlong consume workflow definition
  */
 @Component
-public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
+public class ApplyConsumeWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private ConsumptionCompleteProcessListener consumptionCompleteProcessListener;
-
+    private CancelConsumeProcessListener cancelConsumeProcessListener;
     @Autowired
-    private ConsumptionPassTaskListener consumptionPassTaskListener;
-
+    private RejectConsumeProcessListener rejectConsumeProcessListener;
     @Autowired
-    private ConsumptionRejectProcessListener consumptionRejectProcessListener;
-
+    private ApproveConsumeProcessListener approveConsumeProcessListener;
     @Autowired
-    private ConsumptionCancelProcessListener consumptionCancelProcessListener;
+    private OperateConsumeTaskListener operateConsumeTaskListener;
 
     @Autowired
     private WorkflowApproverService workflowApproverService;
-
     @Autowired
-    private ApplyConsumptionProcessHandler applyConsumptionProcessHandler;
-
+    private ApplyConsumeProcessHandler applyConsumeProcessHandler;
     @Autowired
     private InlongGroupService groupService;
 
@@ -75,9 +70,9 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
         process.setName(getProcessName().name());
         process.setType(getProcessName().getDisplayName());
         process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(ApplyConsumptionProcessForm.class);
+        process.setFormClass(ApplyConsumeProcessForm.class);
         process.setVersion(1);
-        process.setProcessDetailHandler(applyConsumptionProcessHandler);
+        process.setProcessDetailHandler(applyConsumeProcessHandler);
 
         // Start node
         StartEvent startEvent = new StartEvent();
@@ -98,9 +93,9 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
         UserTask adminUserTask = new UserTask();
         adminUserTask.setName(UT_ADMIN_NAME);
         adminUserTask.setDisplayName("SystemAdmin");
-        adminUserTask.setFormClass(ConsumptionApproveForm.class);
+        adminUserTask.setFormClass(ConsumeApproveForm.class);
         adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
-        adminUserTask.addListener(consumptionPassTaskListener);
+        adminUserTask.addListener(operateConsumeTaskListener);
         process.addTask(adminUserTask);
 
         // Set order relation
@@ -109,16 +104,16 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
         adminUserTask.addNext(endEvent);
 
         // Set up the listener
-        process.addListener(consumptionCompleteProcessListener);
-        process.addListener(consumptionRejectProcessListener);
-        process.addListener(consumptionCancelProcessListener);
+        process.addListener(approveConsumeProcessListener);
+        process.addListener(rejectConsumeProcessListener);
+        process.addListener(cancelConsumeProcessListener);
 
         return process;
     }
 
     private List<String> groupOwnerUserTaskApprover(WorkflowContext context) {
-        ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
-        InlongGroupInfo groupInfo = groupService.get(form.getConsumptionInfo().getInlongGroupId());
+        ApplyConsumeProcessForm form = (ApplyConsumeProcessForm) context.getProcessForm();
+        InlongGroupInfo groupInfo = groupService.get(form.getConsumeInfo().getInlongGroupId());
         if (groupInfo == null || groupInfo.getInCharges() == null) {
             return Collections.emptyList();
         }
@@ -128,7 +123,7 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
 
     @Override
     public ProcessName getProcessName() {
-        return ProcessName.APPLY_CONSUMPTION_PROCESS;
+        return ProcessName.APPLY_CONSUME_PROCESS;
     }
 
     /**
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index af3cb0549..760d93316 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -594,10 +594,10 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
     KEY `process_name_task_name_index` (`process_name`, `task_name`)
 );
 
--- create default approver for new consumption and new inlong group
+-- create workflow approver for newly inlong group and inlong consume.
 INSERT INTO `workflow_approver`(`process_name`, `task_name`, `approvers`, `creator`, `modifier`)
-VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
-       ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
+VALUES ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
+       ('APPLY_CONSUME_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
 
 -- ----------------------------
 -- Table structure for workflow_event_log
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index c594d9519..825e6841e 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -631,10 +631,10 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow approver table';
 
--- create default approver for new consumption and new inlong group
+-- create workflow approver for newly inlong group and inlong consume.
 INSERT INTO `workflow_approver`(`process_name`, `task_name`, `approvers`, `creator`, `modifier`)
-VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
-       ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
+VALUES ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
+       ('APPLY_CONSUME_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
 
 -- ----------------------------
 -- Table structure for workflow_event_log
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
index 6c3f0bad5..475ebda07 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -62,7 +62,7 @@ public class InlongConsumeController {
     @RequestMapping(value = "/consume/save", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save inlong consume")
-    public Response<Integer> save(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) {
+    public Response<Integer> save(@RequestBody InlongConsumeRequest request) {
         String operator = LoginUserUtils.getLoginUser().getName();
         return Response.success(consumeService.save(request, operator));
     }
@@ -100,7 +100,7 @@ public class InlongConsumeController {
     @ApiOperation(value = "Delete inlong consume by ID")
     @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
     public Response<Object> delete(@PathVariable(name = "id") Integer id) {
-        this.consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
+        consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
         return Response.success();
     }