You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/25 03:23:48 UTC
[inlong] branch master updated: [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process (#5985)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4f84cfb23 [INLONG-5984][Manager] Optimize the logic related to the InlongConsume process (#5985)
4f84cfb23 is described below
commit 4f84cfb23c00d02d6a8e72850d7ecf3106200812
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Sun Sep 25 11:23:44 2022 +0800
[INLONG-5984][Manager] Optimize the logic related to the InlongConsume process (#5985)
---
.../inlong/manager/common/enums/ConsumeStatus.java | 114 +++++++++++++--------
.../inlong/manager/common/enums/ProcessName.java | 4 +-
.../dao/mapper/InlongConsumeEntityMapper.java | 2 +
.../mappers/InlongConsumeEntityMapper.xml | 11 ++
...ocessForm.java => ApplyConsumeProcessForm.java} | 18 ++--
.../workflow/form/process/BaseProcessForm.java | 2 +-
.../pojo/workflow/form/task/BaseTaskForm.java | 2 +-
...ionApproveForm.java => ConsumeApproveForm.java} | 6 +-
.../service/consume/AbstractConsumeOperator.java | 2 +-
.../consume/InlongConsumeProcessService.java | 41 ++------
.../service/consume/InlongConsumeService.java | 10 ++
.../service/consume/InlongConsumeServiceImpl.java | 41 ++++++--
.../service/core/impl/ConsumptionServiceImpl.java | 14 +--
.../service/group/InlongPulsarOperator.java | 20 ++--
.../OperateConsumeTaskListener.java} | 33 +++---
.../apply/ApproveConsumeProcessListener.java} | 47 +++++----
.../apply/CancelConsumeProcessListener.java} | 30 +++---
.../apply/RejectConsumeProcessListener.java} | 36 +++----
.../ApplyConsumeProcessHandler.java} | 10 +-
.../ApplyConsumeWorkflowDefinition.java} | 53 +++++-----
.../main/resources/h2/apache_inlong_manager.sql | 6 +-
.../manager-web/sql/apache_inlong_manager.sql | 6 +-
.../web/controller/InlongConsumeController.java | 4 +-
23 files changed, 283 insertions(+), 229 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
index 0b9013df2..065b225f0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
@@ -17,71 +17,101 @@
package org.apache.inlong.manager.common.enums;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import org.apache.inlong.manager.common.util.InlongCollectionUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import java.util.Map;
import java.util.Set;
-import java.util.function.Function;
/**
* Inlong consume status
*/
-@ApiModel("Inlong consume status")
public enum ConsumeStatus {
- @ApiModelProperty(value = "To be allocated: 10")
- WAIT_ASSIGN(10),
+ TO_BE_SUBMIT(100, "waiting for submit"),
+ TO_BE_APPROVAL(101, "waiting for approval"),
- @ApiModelProperty(value = "Pending approval: 11")
- WAIT_APPROVE(11),
+ APPROVE_REJECTED(102, "approval rejected"),
+ APPROVE_PASSED(103, "approval passed"),
+ APPROVE_CANCELED(104, "approval canceled"),
- @ApiModelProperty(value = "Approval rejected: 20")
- REJECTED(20),
-
- @ApiModelProperty(value = "Approval and approval: 21")
- APPROVED(21),
-
- @ApiModelProperty(value = "Cancel application: 22")
- CANCELED(22),
-
- @ApiModelProperty(value = "Deleting: 41")
- DELETING(41),
-
- @ApiModelProperty(value = "Deleted: 40")
- DELETED(40),
+ DELETING(41, "deleting"),
+ DELETED(40, "deleted"),
;
- public static final Set<ConsumeStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet
- .of(WAIT_ASSIGN, REJECTED, CANCELED);
+ /**
+ * State automaton for InlongConsume
+ */
+ private static final Map<ConsumeStatus, Set<ConsumeStatus>> CONSUME_STATE_AUTOMATON = Maps.newHashMap();
+
+ /*
+ * Init consume finite status automaton
+ */
+ static {
+ CONSUME_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING));
+ CONSUME_STATE_AUTOMATON.put(TO_BE_APPROVAL,
+ Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED, APPROVE_CANCELED, DELETING));
+
+ CONSUME_STATE_AUTOMATON.put(APPROVE_REJECTED, Sets.newHashSet(APPROVE_REJECTED, TO_BE_APPROVAL, DELETING));
+ CONSUME_STATE_AUTOMATON.put(APPROVE_CANCELED, Sets.newHashSet(APPROVE_CANCELED, TO_BE_APPROVAL, DELETING));
+ CONSUME_STATE_AUTOMATON.put(APPROVE_PASSED, Sets.newHashSet(APPROVE_PASSED, TO_BE_APPROVAL, DELETING));
+
+ CONSUME_STATE_AUTOMATON.put(DELETING, Sets.newHashSet(DELETING, DELETED));
+ CONSUME_STATE_AUTOMATON.put(DELETED, Sets.newHashSet(DELETED));
+ }
- public static final Set<ConsumeStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN);
+ private final Integer code;
+ private final String description;
- private static final Map<Integer, ConsumeStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap(
- Lists.newArrayList(ConsumeStatus.values()),
- ConsumeStatus::getCode,
- Function.identity()
- );
+ ConsumeStatus(Integer code, String description) {
+ this.code = code;
+ this.description = description;
+ }
- private final int code;
+ /**
+ * Get the ConsumeStatus instance from the given code
+ *
+ * @param code status code
+ * @return instance of ConsumeStatus
+ */
+ public static ConsumeStatus forCode(int code) {
+ for (ConsumeStatus status : values()) {
+ if (status.getCode() == code) {
+ return status;
+ }
+ }
+ throw new BusinessException(String.format("Illegal code=%s for ConsumeStatus", code));
+ }
- ConsumeStatus(int code) {
- this.code = code;
+ /**
+ * Check whether the current status can be transferred to the next status
+ *
+ * @param cur current status
+ * @param next next status
+ * @return true if transferred, false if not
+ */
+ public static boolean notAllowedTransfer(ConsumeStatus cur, ConsumeStatus next) {
+ Set<ConsumeStatus> nextSet = CONSUME_STATE_AUTOMATON.get(cur);
+ return nextSet == null || !nextSet.contains(next);
}
- public static ConsumeStatus fromStatus(int status) {
- ConsumeStatus consumeStatus = STATUS_MAP.get(status);
- Preconditions.checkNotNull(consumeStatus, "consume status is invalid for " + status);
- return consumeStatus;
+ /**
+ * Checks whether the given status allows the update.
+ */
+ public static boolean allowedUpdate(ConsumeStatus status) {
+ return status == ConsumeStatus.TO_BE_SUBMIT
+ || status == ConsumeStatus.APPROVE_REJECTED
+ || status == ConsumeStatus.APPROVE_CANCELED;
}
- public int getCode() {
+ public Integer getCode() {
return code;
}
+ public String getDescription() {
+ return description;
+ }
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
index bf604f2ef..997226594 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ProcessName.java
@@ -48,9 +48,9 @@ public enum ProcessName {
DELETE_GROUP_PROCESS("Delete-Group"),
/**
- * Apply consumption process
+ * Apply inlong consume process
*/
- APPLY_CONSUMPTION_PROCESS("Apply-Consumption"),
+ APPLY_CONSUME_PROCESS("Apply-Consume"),
/**
* Create inlong stream process
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 1739c2c8c..6b0db6617 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -43,6 +43,8 @@ public interface InlongConsumeEntityMapper {
int updateByIdSelective(InlongConsumeEntity record);
+ void updateStatus(@Param("id") Integer id, @Param("status") Integer status, @Param("modifier") String modifier);
+
int deleteById(Integer id);
}
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
index d17dc3b87..c38c53028 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -189,6 +189,9 @@
<if test="status != null">
status = #{status, jdbcType=INTEGER},
</if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
<if test="modifier != null">
modifier = #{modifier, jdbcType=VARCHAR},
</if>
@@ -198,6 +201,14 @@
and is_deleted = 0
and version = #{version, jdbcType=INTEGER}
</update>
+ <update id="updateStatus">
+ update inlong_consume
+ set previous_status = status,
+ status = #{status, jdbcType=INTEGER},
+ modifier = #{modifier, jdbcType=VARCHAR}
+ where id = #{id,jdbcType=INTEGER}
+ and is_deleted = 0
+ </update>
<delete id="deleteById" parameterType="java.lang.Integer">
delete
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumptionProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
similarity index 73%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
index 584118848..85d657c45 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumptionProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
@@ -22,8 +22,8 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import org.apache.inlong.manager.common.exceptions.FormValidateException;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import java.util.Map;
@@ -32,16 +32,16 @@ import java.util.Map;
*/
@Data
@EqualsAndHashCode(callSuper = true)
-public class ApplyConsumptionProcessForm extends BaseProcessForm {
+public class ApplyConsumeProcessForm extends BaseProcessForm {
- public static final String FORM_NAME = "ApplyConsumptionProcessForm";
+ public static final String FORM_NAME = "ApplyConsumeProcessForm";
- @ApiModelProperty(value = "Data consumption information")
- private ConsumptionInfo consumptionInfo;
+ @ApiModelProperty(value = "Inlong consume info")
+ private InlongConsumeInfo consumeInfo;
@Override
public void validate() throws FormValidateException {
- Preconditions.checkNotNull(consumptionInfo, "Data consumption information cannot be empty");
+ Preconditions.checkNotNull(consumeInfo, "Inlong consume cannot be empty");
}
@Override
@@ -51,14 +51,14 @@ public class ApplyConsumptionProcessForm extends BaseProcessForm {
@Override
public String getInlongGroupId() {
- return consumptionInfo.getConsumerGroup();
+ return consumeInfo.getInlongGroupId();
}
@Override
public Map<String, Object> showInList() {
Map<String, Object> show = Maps.newHashMap();
- if (consumptionInfo != null) {
- show.put("inlongGroupId", consumptionInfo.getInlongGroupId());
+ if (consumeInfo != null) {
+ show.put("inlongGroupId", consumeInfo.getInlongGroupId());
}
return show;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java
index 6cbbbbac2..810a203f8 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/BaseProcessForm.java
@@ -29,7 +29,7 @@ import lombok.Data;
@JsonTypeInfo(use = Id.NAME, property = "formName")
@JsonSubTypes({
@JsonSubTypes.Type(value = ApplyGroupProcessForm.class, name = ApplyGroupProcessForm.FORM_NAME),
- @JsonSubTypes.Type(value = ApplyConsumptionProcessForm.class, name = ApplyConsumptionProcessForm.FORM_NAME),
+ @JsonSubTypes.Type(value = ApplyConsumeProcessForm.class, name = ApplyConsumeProcessForm.FORM_NAME),
@JsonSubTypes.Type(value = GroupResourceProcessForm.class, name = GroupResourceProcessForm.FORM_NAME),
@JsonSubTypes.Type(value = StreamResourceProcessForm.class, name = StreamResourceProcessForm.FORM_NAME),
})
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java
index fe209d1cd..c35e2cd7e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/BaseTaskForm.java
@@ -29,7 +29,7 @@ import lombok.Data;
@JsonTypeInfo(use = Id.NAME, property = "formName")
@JsonSubTypes({
@JsonSubTypes.Type(value = InlongGroupApproveForm.class, name = InlongGroupApproveForm.FORM_NAME),
- @JsonSubTypes.Type(value = ConsumptionApproveForm.class, name = ConsumptionApproveForm.FORM_NAME),
+ @JsonSubTypes.Type(value = ConsumeApproveForm.class, name = ConsumeApproveForm.FORM_NAME),
@JsonSubTypes.Type(value = ServiceTaskForm.class, name = ServiceTaskForm.FORM_NAME),
})
public abstract class BaseTaskForm implements TaskForm {
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumptionApproveForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumeApproveForm.java
similarity index 89%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumptionApproveForm.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumeApproveForm.java
index ccbcec9d4..ef2c0c952 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumptionApproveForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/task/ConsumeApproveForm.java
@@ -24,13 +24,13 @@ import org.apache.inlong.manager.common.exceptions.FormValidateException;
import org.apache.inlong.manager.common.util.Preconditions;
/**
- * The approval form of the consumption
+ * The approval form of the inlong consume
*/
@Data
@EqualsAndHashCode(callSuper = true)
-public class ConsumptionApproveForm extends BaseTaskForm {
+public class ConsumeApproveForm extends BaseTaskForm {
- public static final String FORM_NAME = "ConsumptionApproveForm";
+ public static final String FORM_NAME = "ConsumeApproveForm";
@ApiModelProperty("Consumer group")
private String consumerGroup;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
index e9f886909..8840701bc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
@@ -50,7 +50,7 @@ public abstract class AbstractConsumeOperator implements InlongConsumeOperator {
// set the ext params, init status, and other info
InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new);
this.setTargetEntity(request, entity);
- entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
+ entity.setStatus(ConsumeStatus.TO_BE_SUBMIT.getCode());
entity.setCreator(operator);
entity.setModifier(operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
index d4be887e4..ddd942dc4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
@@ -18,18 +18,10 @@
package org.apache.inlong.manager.service.consume;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ProcessName;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.service.workflow.WorkflowService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -42,11 +34,9 @@ import org.springframework.stereotype.Service;
public class InlongConsumeProcessService {
@Autowired
- private ConsumptionService consumptionService;
+ private InlongConsumeService consumeService;
@Autowired
private WorkflowService workflowService;
- @Autowired
- private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
/**
* Start the process for the specified ID.
@@ -56,30 +46,13 @@ public class InlongConsumeProcessService {
* @return workflow result
*/
public WorkflowResult startProcess(Integer id, String operator) {
- ConsumptionInfo consumptionInfo = consumptionService.get(id);
- Preconditions.checkTrue(ConsumeStatus.ALLOW_START_WORKFLOW_STATUS.contains(
- ConsumeStatus.fromStatus(consumptionInfo.getStatus())),
- "current status not allowed to start workflow");
-
- consumptionInfo.setStatus(ConsumeStatus.WAIT_APPROVE.getCode());
- boolean rowCount = consumptionService.update(consumptionInfo, operator);
- Preconditions.checkTrue(rowCount, "update consumption failed");
-
- return workflowService.start(ProcessName.APPLY_CONSUMPTION_PROCESS, operator,
- genConsumptionProcessForm(consumptionInfo));
+ consumeService.updateStatus(id, ConsumeStatus.TO_BE_APPROVAL.getCode(), operator);
+ return workflowService.start(ProcessName.APPLY_CONSUME_PROCESS, operator, genApplyConsumeProcessForm(id));
}
- private ApplyConsumptionProcessForm genConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
- ApplyConsumptionProcessForm form = new ApplyConsumptionProcessForm();
- Integer id = consumptionInfo.getId();
- String mqType = consumptionInfo.getMqType();
- if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
- ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
- ConsumptionPulsarInfo::new);
- consumptionInfo.setMqExtInfo(pulsarInfo);
- }
- form.setConsumptionInfo(consumptionInfo);
+ private ApplyConsumeProcessForm genApplyConsumeProcessForm(Integer id) {
+ ApplyConsumeProcessForm form = new ApplyConsumeProcessForm();
+ form.setConsumeInfo(consumeService.get(id));
return form;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
index 41cbe89ed..cb295cb22 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -92,6 +92,16 @@ public interface InlongConsumeService {
Integer update(@Valid @NotNull(message = "inlong consume request cannot be null") InlongConsumeRequest request,
String operator);
+ /**
+ * Update the inlong consume status to the specified status
+ *
+ * @param id inlong consume id
+ * @param status modified status
+ * @param operator name of operator
+ * @return whether succeed
+ */
+ Boolean updateStatus(Integer id, Integer status, String operator);
+
/**
* Delete the inlong consume by the id
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index 43d73ea56..51b2868f5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -104,7 +104,7 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
entity.setFilterEnabled(0);
entity.setInCharges(groupInfo.getInCharges());
- entity.setStatus(ConsumeStatus.APPROVED.getCode());
+ entity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
String operator = groupInfo.getCreator();
entity.setCreator(operator);
entity.setModifier(operator);
@@ -153,11 +153,11 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
int status = Integer.parseInt(countInfo.getKey());
int count = countInfo.getValue();
result.setTotalCount(result.getTotalCount() + count);
- if (status == ConsumeStatus.WAIT_ASSIGN.getCode()) {
+ if (status == ConsumeStatus.TO_BE_SUBMIT.getCode()) {
result.setWaitAssignCount(result.getWaitAssignCount() + count);
- } else if (status == ConsumeStatus.WAIT_APPROVE.getCode()) {
+ } else if (status == ConsumeStatus.TO_BE_APPROVAL.getCode()) {
result.setWaitApproveCount(result.getWaitApproveCount() + count);
- } else if (status == ConsumeStatus.REJECTED.getCode()) {
+ } else if (status == ConsumeStatus.APPROVE_REJECTED.getCode()) {
result.setRejectCount(result.getRejectCount() + count);
}
}
@@ -186,7 +186,8 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
}
@Override
- @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
+ @Transactional(rollbackFor = Throwable.class,
+ isolation = Isolation.REPEATABLE_READ,
propagation = Propagation.REQUIRES_NEW)
public Integer update(InlongConsumeRequest request, String operator) {
LOGGER.debug("begin to update inlong consume={} by user={}", request, operator);
@@ -205,8 +206,8 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
- ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(existEntity.getStatus());
- Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+ ConsumeStatus consumeStatus = ConsumeStatus.forCode(existEntity.getStatus());
+ Preconditions.checkTrue(ConsumeStatus.allowedUpdate(consumeStatus),
"inlong consume not allowed update when status is " + consumeStatus.name());
InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType());
@@ -216,6 +217,32 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
return consumeId;
}
+ @Override
+ @Transactional(rollbackFor = Throwable.class,
+ isolation = Isolation.REPEATABLE_READ,
+ propagation = Propagation.REQUIRES_NEW)
+ public Boolean updateStatus(Integer id, Integer status, String operator) {
+ LOGGER.info("begin to update consume status to [{}] for id={} by user={}", status, id, operator);
+ Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
+ InlongConsumeEntity entity = consumeMapper.selectById(id);
+ if (entity == null) {
+ LOGGER.error("inlong consume not found by id={}", id);
+ throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
+ }
+
+ ConsumeStatus curStatus = ConsumeStatus.forCode(entity.getStatus());
+ ConsumeStatus nextStatus = ConsumeStatus.forCode(status);
+ if (ConsumeStatus.notAllowedTransfer(curStatus, nextStatus)) {
+ String errorMsg = String.format("Current status=%s cannot transfer to status=%s", curStatus, nextStatus);
+ LOGGER.error(errorMsg);
+ throw new BusinessException(errorMsg);
+ }
+
+ consumeMapper.updateStatus(id, status, operator);
+ LOGGER.info("success to update consume status to [{}] for id={} by user={}", status, id, operator);
+ return true;
+ }
+
@Override
public Boolean delete(Integer id, String operator) {
LOGGER.info("begin to delete inlong consume for id={} by user={}", id, operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 0de16b5c6..469309c05 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -103,9 +103,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
return ConsumptionSummary.builder()
.totalCount(countMap.values().stream().mapToInt(c -> c).sum())
- .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.WAIT_ASSIGN.getCode() + "", 0))
- .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.WAIT_APPROVE.getCode() + "", 0))
- .rejectedCount(countMap.getOrDefault(ConsumeStatus.REJECTED.getCode() + "", 0)).build();
+ .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.TO_BE_SUBMIT.getCode() + "", 0))
+ .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.TO_BE_APPROVAL.getCode() + "", 0))
+ .rejectedCount(countMap.getOrDefault(ConsumeStatus.APPROVE_REJECTED.getCode() + "", 0)).build();
}
@Override
@@ -165,8 +165,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
if (info.getId() != null) {
ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
- ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(consumptionEntity.getStatus());
- Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+ ConsumeStatus consumeStatus = ConsumeStatus.forCode(consumptionEntity.getStatus());
+ Preconditions.checkTrue(ConsumeStatus.allowedUpdate(consumeStatus),
"consumption not allow update when status is " + consumeStatus.name());
}
@@ -283,7 +283,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
// If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
// add/remove inlong streams in the inlong group
- if (ConsumeStatus.APPROVED.getCode() == exists.getStatus()) {
+ if (ConsumeStatus.APPROVE_PASSED.getCode() == exists.getStatus()) {
String groupId = info.getInlongGroupId();
String dlqNameOld = pulsarEntity.getDeadLetterTopic();
String dlqNameNew = update.getDeadLetterTopic();
@@ -337,7 +337,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
- entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
+ entity.setStatus(ConsumeStatus.TO_BE_SUBMIT.getCode());
entity.setCreator(operator);
entity.setModifier(operator);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
index b9c98faad..d00fe64b4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongPulsarOperator.java
@@ -31,11 +31,15 @@ import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.List;
+
/**
* Inlong group operator for Pulsar.
*/
@@ -46,6 +50,8 @@ public class InlongPulsarOperator extends AbstractGroupOperator {
@Autowired
private ObjectMapper objectMapper;
+ @Autowired
+ private InlongStreamService streamService;
@Override
public Boolean accept(String mqType) {
@@ -107,13 +113,13 @@ public class InlongPulsarOperator extends AbstractGroupOperator {
@Override
public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
- // TODO add cache for cluster info
- // pulsar topic corresponds to the inlong stream one-to-one
- // topicInfo.setDsTopicList(streamService.getTopicList(groupInfo.getInlongGroupId()));
- // commonOperateService.getSpecifiedParam(InlongConstants.TUBE_MASTER_URL);
- // groupInfo.setTenant();
- // groupInfo.setAdminUrl();
- // groupInfo.setServiceUrl();
+ // Pulsar topic corresponds to the inlong stream one-to-one
+ List<InlongStreamBriefInfo> streamTopics = streamService.getTopicList(groupInfo.getInlongGroupId());
+ topicInfo.setStreamTopics(streamTopics);
+ // TODO add cache for cluster info, and support extends different MQs
+ // topicInfo.setTenant();
+ // topicInfo.setAdminUrl();
+ // topicInfo.setServiceUrl();
return topicInfo;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionPassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/OperateConsumeTaskListener.java
similarity index 61%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionPassTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/OperateConsumeTaskListener.java
index 924b90352..6a4146a3f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionPassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/OperateConsumeTaskListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume;
import com.alibaba.druid.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
@@ -23,10 +23,10 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.TaskEvent;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.task.ConsumptionApproveForm;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.task.ConsumeApproveForm;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.TaskEventListener;
@@ -34,14 +34,14 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * New data consumption-system administrator approval task event listener
+ * New inlong consume approval task event listener
*/
@Slf4j
@Component
-public class ConsumptionPassTaskListener implements TaskEventListener {
+public class OperateConsumeTaskListener implements TaskEventListener {
@Autowired
- private ConsumptionService consumptionService;
+ private InlongConsumeService consumeService;
@Override
public TaskEvent event() {
@@ -50,19 +50,20 @@ public class ConsumptionPassTaskListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
- ConsumptionApproveForm approveForm = (ConsumptionApproveForm) context.getActionContext().getForm();
- ConsumptionInfo info = form.getConsumptionInfo();
- if (StringUtils.equals(approveForm.getConsumerGroup(), info.getConsumerGroup())) {
- return ListenerResult.success("The consumer group has not been modified");
+ ApplyConsumeProcessForm consumeForm = (ApplyConsumeProcessForm) context.getProcessForm();
+ ConsumeApproveForm approveForm = (ConsumeApproveForm) context.getActionContext().getForm();
+ InlongConsumeInfo consumeInfo = consumeForm.getConsumeInfo();
+ if (StringUtils.equals(approveForm.getConsumerGroup(), consumeInfo.getConsumerGroup())) {
+ return ListenerResult.success("Consumer group has not been modified");
}
- boolean exist = consumptionService.isConsumerGroupExists(approveForm.getConsumerGroup(), info.getId());
+
+ boolean exist = consumeService.consumerGroupExists(approveForm.getConsumerGroup(), consumeInfo.getId());
if (exist) {
log.error("consumer group {} already exist", approveForm.getConsumerGroup());
throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_DUPLICATED);
}
- return ListenerResult.success("Consumer group from " + info.getConsumerGroup()
- + " change to " + approveForm.getConsumerGroup());
+ return ListenerResult.success(String.format("Consumer group %s change to %s",
+ consumeInfo.getConsumerGroup(), approveForm.getConsumerGroup()));
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
similarity index 82%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index 61de6bb5d..07d9ed0b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume.apply;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -28,15 +28,15 @@ import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarUtils;
@@ -52,16 +52,16 @@ import java.util.Arrays;
import java.util.List;
/**
- * Added data consumption process complete archive event listener
+ * Inlong consume process complete archive event listener
*/
@Slf4j
@Component
-public class ConsumptionCompleteProcessListener implements ProcessEventListener {
+public class ApproveConsumeProcessListener implements ProcessEventListener {
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
- private ConsumptionEntityMapper consumptionMapper;
+ private InlongConsumeEntityMapper consumeMapper;
@Autowired
private InlongClusterService clusterService;
@Autowired
@@ -76,13 +76,12 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- ApplyConsumptionProcessForm consumptionForm = (ApplyConsumptionProcessForm) context.getProcessForm();
-
- // Real-time query of consumption information
- Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
- ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(consumptionId);
+ // query of inlong consume info from DB
+ ApplyConsumeProcessForm consumeForm = (ApplyConsumeProcessForm) context.getProcessForm();
+ Integer consumeId = consumeForm.getConsumeInfo().getId();
+ InlongConsumeEntity entity = consumeMapper.selectById(consumeId);
if (entity == null) {
- throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
+ throw new WorkflowListenerException("inlong consume not exits for id=" + consumeId);
}
String mqType = entity.getMqType();
@@ -92,13 +91,13 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
} else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
this.createPulsarSubscription(entity);
} else if (MQType.KAFKA.equals(mqType)) {
- //TODO add kakfa
-
+ // TODO add Kafka
} else {
throw new WorkflowListenerException("Unsupported MQ type " + mqType);
}
- this.updateConsumerInfo(consumptionId, entity.getConsumerGroup());
+ // the consumer group may be changed when approving
+ this.updateConsumerInfo(consumeId, entity.getConsumerGroup());
return ListenerResult.success("Create MQ consumer group successful");
}
@@ -106,13 +105,13 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
* Update consumption after approve
*/
private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
- ConsumptionEntity update = consumptionMapper.selectByPrimaryKey(consumptionId);
- update.setStatus(ConsumeStatus.APPROVED.getCode());
- update.setConsumerGroup(consumerGroup);
- int rowCount = consumptionMapper.updateByPrimaryKeySelective(update);
+ InlongConsumeEntity existEntity = consumeMapper.selectById(consumptionId);
+ existEntity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
+ existEntity.setConsumerGroup(consumerGroup);
+ int rowCount = consumeMapper.updateByIdSelective(existEntity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- log.error("consumption information has already updated, id={}, curVersion={}",
- update.getId(), update.getVersion());
+ log.error("inlong consume has already updated, id={}, curVersion={}",
+ existEntity.getId(), existEntity.getVersion());
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
}
@@ -120,7 +119,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
/**
* Create Pulsar subscription
*/
- private void createPulsarSubscription(ConsumptionEntity entity) {
+ private void createPulsarSubscription(InlongConsumeEntity entity) {
String groupId = entity.getInlongGroupId();
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
@@ -162,7 +161,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
/**
* Create TubeMQ consumer group
*/
- private void createTubeConsumerGroup(ConsumptionEntity entity, String operator) {
+ private void createTubeConsumerGroup(InlongConsumeEntity entity, String operator) {
String groupId = entity.getInlongGroupId();
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/CancelConsumeProcessListener.java
similarity index 65%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/CancelConsumeProcessListener.java
index 4d1c09e6c..3a8cd8c62 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/CancelConsumeProcessListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume.apply;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.consts.InlongConstants;
@@ -24,9 +24,9 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
@@ -34,18 +34,18 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Added data consumption process cancellation event listener
+ * Inlong consume process cancellation event listener
*/
@Slf4j
@Component
-public class ConsumptionCancelProcessListener implements ProcessEventListener {
+public class CancelConsumeProcessListener implements ProcessEventListener {
@Autowired
- private ConsumptionEntityMapper consumptionEntityMapper;
+ private InlongConsumeEntityMapper consumeMapper;
@Autowired
- public ConsumptionCancelProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
- this.consumptionEntityMapper = consumptionEntityMapper;
+ public CancelConsumeProcessListener(InlongConsumeEntityMapper consumeMapper) {
+ this.consumeMapper = consumeMapper;
}
@Override
@@ -55,14 +55,14 @@ public class ConsumptionCancelProcessListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
+ ApplyConsumeProcessForm processForm = (ApplyConsumeProcessForm) context.getProcessForm();
- ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
- update.setStatus(ConsumeStatus.CANCELED.getCode());
- int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
+ InlongConsumeEntity consumeEntity = consumeMapper.selectById(processForm.getConsumeInfo().getId());
+ consumeEntity.setStatus(ConsumeStatus.APPROVE_CANCELED.getCode());
+ int rowCount = consumeMapper.updateByIdSelective(consumeEntity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- log.error("consumption information has already updated, id={}, curVersion={}",
- update.getId(), update.getVersion());
+ log.error("inlong consume has already updated, id={}, curVersion={}",
+ consumeEntity.getId(), consumeEntity.getVersion());
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
return ListenerResult.success("Application process is cancelled");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/RejectConsumeProcessListener.java
similarity index 63%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/RejectConsumeProcessListener.java
index 0161cb4e9..cf96224bc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/RejectConsumeProcessListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.listener.consumption;
+package org.apache.inlong.manager.service.listener.consume.apply;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
@@ -23,9 +23,9 @@ import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.ProcessEvent;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEventListener;
@@ -35,19 +35,19 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Added data consumption process rejection event listener
+ * Inlong consume process rejection event listener
*/
@Component
-public class ConsumptionRejectProcessListener implements ProcessEventListener {
+public class RejectConsumeProcessListener implements ProcessEventListener {
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsumptionRejectProcessListener.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(RejectConsumeProcessListener.class);
@Autowired
- private ConsumptionEntityMapper consumptionEntityMapper;
+ private InlongConsumeEntityMapper consumeMapper;
@Autowired
- public ConsumptionRejectProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
- this.consumptionEntityMapper = consumptionEntityMapper;
+ public RejectConsumeProcessListener(InlongConsumeEntityMapper consumeMapper) {
+ this.consumeMapper = consumeMapper;
}
@Override
@@ -57,17 +57,17 @@ public class ConsumptionRejectProcessListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
+ ApplyConsumeProcessForm processForm = (ApplyConsumeProcessForm) context.getProcessForm();
+ InlongConsumeEntity consumeEntity = consumeMapper.selectById(processForm.getConsumeInfo().getId());
+ consumeEntity.setStatus(ConsumeStatus.APPROVE_REJECTED.getCode());
- ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
- update.setStatus(ConsumeStatus.REJECTED.getCode());
-
- int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
+ int rowCount = consumeMapper.updateByIdSelective(consumeEntity);
if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error("consumption information has already updated, id={}, curVersion={}",
- update.getId(), update.getVersion());
+ LOGGER.error("inlong consume has already updated, id={}, curVersion={}",
+ consumeEntity.getId(), consumeEntity.getVersion());
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
- return ListenerResult.success("The application process was rejected");
+
+ return ListenerResult.success("The consume application was rejected");
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeProcessHandler.java
similarity index 87%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeProcessHandler.java
index 7579b5a85..d5e1d63d7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionProcessHandler.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeProcessHandler.java
@@ -15,11 +15,11 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.consumption;
+package org.apache.inlong.manager.service.workflow.consume;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.manager.pojo.workflow.ProcessDetailResponse;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
import org.apache.inlong.manager.workflow.definition.ProcessDetailHandler;
import org.apache.inlong.manager.workflow.definition.WorkflowProcess;
@@ -28,10 +28,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Apply consumption process handler
+ * Apply inlong consume process handler
*/
@Component
-public class ApplyConsumptionProcessHandler implements ProcessDetailHandler {
+public class ApplyConsumeProcessHandler implements ProcessDetailHandler {
@Autowired
private ObjectMapper objectMapper;
@@ -41,7 +41,7 @@ public class ApplyConsumptionProcessHandler implements ProcessDetailHandler {
@Override
public ProcessDetailResponse handle(ProcessDetailResponse processResponse) {
WorkflowProcess process = processDefinitionService.getByName(processResponse.getWorkflow().getName());
- ApplyConsumptionProcessForm processForm = WorkflowUtils.parseProcessForm(objectMapper,
+ ApplyConsumeProcessForm processForm = WorkflowUtils.parseProcessForm(objectMapper,
processResponse.getProcessInfo().getFormData().toString(), process);
if (processForm == null) {
return processResponse;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeWorkflowDefinition.java
similarity index 71%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeWorkflowDefinition.java
index a5fa8a2d2..1dc368e68 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ApplyConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consume/ApplyConsumeWorkflowDefinition.java
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.consumption;
+package org.apache.inlong.manager.service.workflow.consume;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ProcessName;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
-import org.apache.inlong.manager.pojo.workflow.form.task.ConsumptionApproveForm;
+import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
+import org.apache.inlong.manager.pojo.workflow.form.task.ConsumeApproveForm;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.service.listener.consume.OperateConsumeTaskListener;
+import org.apache.inlong.manager.service.listener.consume.apply.ApproveConsumeProcessListener;
+import org.apache.inlong.manager.service.listener.consume.apply.CancelConsumeProcessListener;
+import org.apache.inlong.manager.service.listener.consume.apply.RejectConsumeProcessListener;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionCancelProcessListener;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionCompleteProcessListener;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionPassTaskListener;
-import org.apache.inlong.manager.service.listener.consumption.ConsumptionRejectProcessListener;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.definition.EndEvent;
import org.apache.inlong.manager.workflow.definition.StartEvent;
@@ -42,29 +42,24 @@ import java.util.Collections;
import java.util.List;
/**
- * New data consumption workflow definition
+ * New inlong consume workflow definition
*/
@Component
-public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
+public class ApplyConsumeWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private ConsumptionCompleteProcessListener consumptionCompleteProcessListener;
-
+ private CancelConsumeProcessListener cancelConsumeProcessListener;
@Autowired
- private ConsumptionPassTaskListener consumptionPassTaskListener;
-
+ private RejectConsumeProcessListener rejectConsumeProcessListener;
@Autowired
- private ConsumptionRejectProcessListener consumptionRejectProcessListener;
-
+ private ApproveConsumeProcessListener approveConsumeProcessListener;
@Autowired
- private ConsumptionCancelProcessListener consumptionCancelProcessListener;
+ private OperateConsumeTaskListener operateConsumeTaskListener;
@Autowired
private WorkflowApproverService workflowApproverService;
-
@Autowired
- private ApplyConsumptionProcessHandler applyConsumptionProcessHandler;
-
+ private ApplyConsumeProcessHandler applyConsumeProcessHandler;
@Autowired
private InlongGroupService groupService;
@@ -75,9 +70,9 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
process.setName(getProcessName().name());
process.setType(getProcessName().getDisplayName());
process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(ApplyConsumptionProcessForm.class);
+ process.setFormClass(ApplyConsumeProcessForm.class);
process.setVersion(1);
- process.setProcessDetailHandler(applyConsumptionProcessHandler);
+ process.setProcessDetailHandler(applyConsumeProcessHandler);
// Start node
StartEvent startEvent = new StartEvent();
@@ -98,9 +93,9 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
UserTask adminUserTask = new UserTask();
adminUserTask.setName(UT_ADMIN_NAME);
adminUserTask.setDisplayName("SystemAdmin");
- adminUserTask.setFormClass(ConsumptionApproveForm.class);
+ adminUserTask.setFormClass(ConsumeApproveForm.class);
adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
- adminUserTask.addListener(consumptionPassTaskListener);
+ adminUserTask.addListener(operateConsumeTaskListener);
process.addTask(adminUserTask);
// Set order relation
@@ -109,16 +104,16 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
adminUserTask.addNext(endEvent);
// Set up the listener
- process.addListener(consumptionCompleteProcessListener);
- process.addListener(consumptionRejectProcessListener);
- process.addListener(consumptionCancelProcessListener);
+ process.addListener(approveConsumeProcessListener);
+ process.addListener(rejectConsumeProcessListener);
+ process.addListener(cancelConsumeProcessListener);
return process;
}
private List<String> groupOwnerUserTaskApprover(WorkflowContext context) {
- ApplyConsumptionProcessForm form = (ApplyConsumptionProcessForm) context.getProcessForm();
- InlongGroupInfo groupInfo = groupService.get(form.getConsumptionInfo().getInlongGroupId());
+ ApplyConsumeProcessForm form = (ApplyConsumeProcessForm) context.getProcessForm();
+ InlongGroupInfo groupInfo = groupService.get(form.getConsumeInfo().getInlongGroupId());
if (groupInfo == null || groupInfo.getInCharges() == null) {
return Collections.emptyList();
}
@@ -128,7 +123,7 @@ public class ApplyConsumptionWorkflowDefinition implements WorkflowDefinition {
@Override
public ProcessName getProcessName() {
- return ProcessName.APPLY_CONSUMPTION_PROCESS;
+ return ProcessName.APPLY_CONSUME_PROCESS;
}
/**
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index af3cb0549..760d93316 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -594,10 +594,10 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
KEY `process_name_task_name_index` (`process_name`, `task_name`)
);
--- create default approver for new consumption and new inlong group
+-- create workflow approver for newly inlong group and inlong consume.
INSERT INTO `workflow_approver`(`process_name`, `task_name`, `approvers`, `creator`, `modifier`)
-VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
- ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
+VALUES ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
+ ('APPLY_CONSUME_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
-- ----------------------------
-- Table structure for workflow_event_log
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index c594d9519..825e6841e 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -631,10 +631,10 @@ CREATE TABLE IF NOT EXISTS `workflow_approver`
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Workflow approver table';
--- create default approver for new consumption and new inlong group
+-- create workflow approver for newly inlong group and inlong consume.
INSERT INTO `workflow_approver`(`process_name`, `task_name`, `approvers`, `creator`, `modifier`)
-VALUES ('APPLY_CONSUMPTION_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
- ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
+VALUES ('APPLY_GROUP_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init'),
+ ('APPLY_CONSUME_PROCESS', 'ut_admin', 'admin', 'inlong_init', 'inlong_init');
-- ----------------------------
-- Table structure for workflow_event_log
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
index 6c3f0bad5..475ebda07 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -62,7 +62,7 @@ public class InlongConsumeController {
@RequestMapping(value = "/consume/save", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save inlong consume")
- public Response<Integer> save(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) {
+ public Response<Integer> save(@RequestBody InlongConsumeRequest request) {
String operator = LoginUserUtils.getLoginUser().getName();
return Response.success(consumeService.save(request, operator));
}
@@ -100,7 +100,7 @@ public class InlongConsumeController {
@ApiOperation(value = "Delete inlong consume by ID")
@ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
public Response<Object> delete(@PathVariable(name = "id") Integer id) {
- this.consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
+ consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
return Response.success();
}