You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/15 02:34:42 UTC
[inlong] branch master updated: [INLONG-6517][Manager] Support delete sinks when the inlong group was config_failed (#6524)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5c9fd9f5c [INLONG-6517][Manager] Support delete sinks when the inlong group was config_failed (#6524)
5c9fd9f5c is described below
commit 5c9fd9f5c15c10fbf51f912e68455c0023b94db3
Author: healchow <he...@gmail.com>
AuthorDate: Tue Nov 15 10:34:34 2022 +0800
[INLONG-6517][Manager] Support delete sinks when the inlong group was config_failed (#6524)
---
.../inlong/manager/common/enums/GroupStatus.java | 62 +++++++++++-----
.../manager/common/enums/SimpleGroupStatus.java | 4 +-
.../inlong/manager/common/enums/SinkStatus.java | 3 -
.../inlong/manager/common/enums/StreamStatus.java | 13 +++-
.../service/group/InlongGroupServiceImpl.java | 13 ++--
.../service/stream/InlongStreamProcessService.java | 84 ++++++++++------------
.../service/stream/InlongStreamServiceImpl.java | 2 +-
7 files changed, 102 insertions(+), 79 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index e60b898d5..0f836d46c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -29,7 +29,6 @@ import java.util.Set;
*/
public enum GroupStatus {
- DRAFT(0, "draft"),
TO_BE_SUBMIT(100, "waiting for submit"),
TO_BE_APPROVAL(101, "waiting for approval"),
@@ -58,7 +57,6 @@ public enum GroupStatus {
* Init group finite status automaton
*/
static {
- GROUP_STATE_AUTOMATON.put(DRAFT, Sets.newHashSet(DRAFT, TO_BE_SUBMIT, DELETING));
GROUP_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING));
GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL, Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED));
@@ -106,36 +104,68 @@ public enum GroupStatus {
}
/**
- * Checks whether the given status allows the update.
+ * Checks whether the given status allows updating operate.
*/
public static boolean notAllowedUpdate(GroupStatus status) {
- return status == GroupStatus.CONFIG_ING || status == GroupStatus.SUSPENDING
- || status == GroupStatus.RESTARTING || status == GroupStatus.DELETING;
+ return status == GroupStatus.TO_BE_APPROVAL
+ || status == GroupStatus.CONFIG_ING
+ || status == GroupStatus.SUSPENDING
+ || status == GroupStatus.RESTARTING
+ || status == GroupStatus.DELETING;
}
/**
- * Checks whether the given status allows the logical delete
+ * Checks whether the given status allows updating the MQ type of inlong group.
*/
- public static boolean allowedLogicDelete(GroupStatus status) {
- return status == GroupStatus.DRAFT || status == GroupStatus.TO_BE_SUBMIT
- || status == GroupStatus.DELETED || status == GroupStatus.FINISH;
+ public static boolean allowedUpdateMQ(GroupStatus status) {
+ return status == GroupStatus.TO_BE_SUBMIT
+ || status == GroupStatus.TO_BE_APPROVAL
+ || status == GroupStatus.APPROVE_REJECTED
+ || status == GroupStatus.CONFIG_FAILED;
}
/**
- * Only the {@link GroupStatus#DRAFT} and {@link GroupStatus#TO_BE_SUBMIT} status
- * allows change the MQ type of inlong group.
+ * Checks whether the given status needs to delete the inlong stream first.
*/
- public static boolean notAllowedUpdateMQ(GroupStatus status) {
- return status != GroupStatus.DRAFT && status != GroupStatus.TO_BE_SUBMIT
- && status != GroupStatus.TO_BE_APPROVAL && status != GroupStatus.APPROVE_REJECTED
- && status != GroupStatus.CONFIG_FAILED;
+ public static boolean deleteStreamFirst(GroupStatus status) {
+ return status == GroupStatus.APPROVE_PASSED
+ || status == GroupStatus.CONFIG_FAILED
+ || status == GroupStatus.CONFIG_SUCCESSFUL
+ || status == GroupStatus.SUSPENDED
+ || status == GroupStatus.RESTARTED
+ || status == GroupStatus.FINISH;
+ }
+
+ /**
+ * Checks whether the given status allows deleting other infos,
+ * <p/>
+ * If true, will logically delete all related infos, including streams, sources, sinks, etc.
+ */
+ public static boolean allowedDeleteSubInfos(GroupStatus status) {
+ return status == GroupStatus.TO_BE_SUBMIT
+ || status == GroupStatus.APPROVE_REJECTED
+ || status == GroupStatus.DELETED;
+ }
+
+ /**
+ * Checks whether the given status allows suspending operate.
+ */
+ public static boolean allowedSuspend(GroupStatus status) {
+ return status == GroupStatus.CONFIG_SUCCESSFUL
+ || status == GroupStatus.RESTARTED
+ || status == GroupStatus.SUSPENDED
+ || status == GroupStatus.FINISH;
}
/**
* Temporary group status, adding, deleting and modifying operations are not allowed
*/
public static boolean isTempStatus(GroupStatus status) {
- return status == TO_BE_APPROVAL || status == CONFIG_ING;
+ return status == TO_BE_APPROVAL
+ || status == CONFIG_ING
+ || status == SUSPENDING
+ || status == RESTARTING
+ || status == DELETING;
}
public Integer getCode() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
index 29edeec89..30dbbacc3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
@@ -36,7 +36,6 @@ public enum SimpleGroupStatus {
public static SimpleGroupStatus parseStatusByCode(int code) {
GroupStatus groupStatus = GroupStatus.forCode(code);
switch (groupStatus) {
- case DRAFT:
case TO_BE_SUBMIT:
case TO_BE_APPROVAL:
return CREATE;
@@ -81,7 +80,6 @@ public enum SimpleGroupStatus {
List<Integer> statusList = new ArrayList<>();
switch (groupStatus) {
case CREATE:
- statusList.add(GroupStatus.DRAFT.getCode());
statusList.add(GroupStatus.TO_BE_SUBMIT.getCode());
return statusList;
case OPERATING:
@@ -114,7 +112,7 @@ public enum SimpleGroupStatus {
statusList.add(GroupStatus.DELETED.getCode());
return statusList;
default:
- throw new IllegalArgumentException(String.format("Unsupported status %s for group", status));
+ throw new IllegalArgumentException(String.format("Unsupported status %s for inlong group", status));
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java
index 5d1aaffc6..0328c018f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java
@@ -22,9 +22,6 @@ package org.apache.inlong.manager.common.enums;
*/
public enum SinkStatus {
- DRAFT(0, "draft"),
- DELETED(10, "deleted"),
-
// Stream sink related status
NEW(100, "new"),
CONFIG_ING(110, "in configure"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
index a522e7481..383563db3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
@@ -22,8 +22,6 @@ package org.apache.inlong.manager.common.enums;
*/
public enum StreamStatus {
- DRAFT(0, "draft"),
-
NEW(100, "new"),
CONFIG_ING(110, "in configure"),
CONFIG_FAILED(120, "configuration failed"),
@@ -47,13 +45,22 @@ public enum StreamStatus {
}
/**
- * Checks whether the given status allows the update.
+ * Checks whether the given status allows updating operate.
*/
public static boolean notAllowedUpdate(StreamStatus status) {
return status == StreamStatus.CONFIG_ING || status == StreamStatus.SUSPENDING
|| status == StreamStatus.RESTARTING || status == StreamStatus.DELETING;
}
+ /**
+ * Checks whether the given status allows deleting operate.
+ */
+ public static boolean notAllowedDelete(StreamStatus status) {
+ return status == StreamStatus.CONFIG_ING
+ || status == StreamStatus.RESTARTING
+ || status == StreamStatus.SUSPENDING;
+ }
+
public static StreamStatus forCode(int code) {
for (StreamStatus status : values()) {
if (status.getCode() == code) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 9c143fd67..4a7a7d56d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -138,7 +138,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
}
// mq type cannot be changed
- if (!entity.getMqType().equals(request.getMqType()) && GroupStatus.notAllowedUpdateMQ(curStatus)) {
+ if (!entity.getMqType().equals(request.getMqType()) && !GroupStatus.allowedUpdateMQ(curStatus)) {
String errMsg = String.format("Current status=%s is not allowed to update MQ type", curStatus);
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
@@ -154,7 +154,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
String groupId = request.getInlongGroupId();
InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
if (entity != null) {
- LOGGER.error("groupId {} has already exists", groupId);
+ LOGGER.error("groupId={} has already exists", groupId);
throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
}
@@ -433,9 +433,9 @@ public class InlongGroupServiceImpl implements InlongGroupService {
throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
}
- // If the status allowed logic delete, all associated info will be logically deleted.
- // In other status, you need to delete the related "inlong_stream" first.
- if (!GroupStatus.allowedLogicDelete(curState)) {
+ // If the status not allowed deleting directly, you need to delete the related "inlong_stream" first,
+ // otherwise, all associated info will be logically deleted.
+ if (GroupStatus.deleteStreamFirst(curState)) {
int count = streamService.selectCountByGroupId(groupId);
if (count >= 1) {
LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count);
@@ -464,8 +464,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
// logically delete the associated extension info
groupExtMapper.logicDeleteAllByGroupId(groupId);
-
- if (GroupStatus.allowedLogicDelete(GroupStatus.forCode(entity.getStatus()))) {
+ if (GroupStatus.allowedDeleteSubInfos(GroupStatus.forCode(entity.getStatus()))) {
streamService.logicDeleteAll(groupId, operator);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index 43fff8cdd..abc8d79de 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -75,12 +75,13 @@ public class InlongStreamProcessService {
*/
public boolean startProcess(String groupId, String streamId, String operator, boolean sync) {
log.info("begin to start stream process for groupId={} streamId={}", groupId, streamId);
+
InlongGroupInfo groupInfo = groupService.get(groupId);
Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
- throw new BusinessException(
- String.format("group status=%s not support start stream for groupId=%s", groupStatus, groupId));
+ throw new BusinessException(String.format("group status=%s not support start stream"
+ + " for groupId=%s", groupStatus, groupId));
}
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
@@ -90,6 +91,7 @@ public class InlongStreamProcessService {
log.warn("stream status={}, not need restart for groupId={} streamId={}", status, groupId, streamId);
return true;
}
+
if (StreamStatus.notAllowedUpdate(status)) {
String errMsg = String.format("stream status=%s not support start stream for groupId=%s streamId=%s",
status, groupId, streamId);
@@ -115,31 +117,28 @@ public class InlongStreamProcessService {
*/
public boolean suspendProcess(String groupId, String streamId, String operator, boolean sync) {
log.info("begin to suspend stream process for groupId={} streamId={}", groupId, streamId);
+
InlongGroupInfo groupInfo = groupService.get(groupId);
- if (groupInfo == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
+ Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
- if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
- && groupStatus != GroupStatus.RESTARTED
- && groupStatus != GroupStatus.SUSPENDED) {
- throw new BusinessException(
- String.format("group status=%s not support suspend stream for groupId=%s", groupStatus, groupId));
+ if (!GroupStatus.allowedSuspend(groupStatus)) {
+ throw new BusinessException(String.format("group status=%s not support suspend stream"
+ + " for groupId=%s", groupStatus, groupId));
}
+
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
- if (streamInfo == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
- }
+ Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
if (status == StreamStatus.SUSPENDED || status == StreamStatus.SUSPENDING) {
- log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
+ log.warn("groupId={}, streamId={} is already in {}", groupId, streamId, status);
return true;
}
+
if (status != StreamStatus.CONFIG_SUCCESSFUL && status != StreamStatus.RESTARTED) {
- throw new BusinessException(
- String.format("stream status=%s not support suspend stream for groupId=%s streamId=%s",
- status, groupId, streamId));
+ throw new BusinessException(String.format("stream status=%s not support suspend stream"
+ + " for groupId=%s streamId=%s", status, groupId, streamId));
}
+
StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
GroupOperateType.SUSPEND);
ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
@@ -158,29 +157,28 @@ public class InlongStreamProcessService {
*/
public boolean restartProcess(String groupId, String streamId, String operator, boolean sync) {
log.info("begin to restart stream process for groupId={} streamId={}", groupId, streamId);
+
InlongGroupInfo groupInfo = groupService.get(groupId);
- if (groupInfo == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
+ Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
throw new BusinessException(
String.format("group status=%s not support restart stream for groupId=%s", groupStatus, groupId));
}
+
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
- if (streamInfo == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
- }
+ Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
if (status == StreamStatus.RESTARTED || status == StreamStatus.RESTARTING) {
- log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
+ log.warn("inlong stream was already in {} for groupId={}, streamId={}", status, groupId, streamId);
return true;
}
+
if (status != StreamStatus.SUSPENDED) {
- throw new BusinessException(
- String.format("stream status=%s not support restart stream for groupId=%s streamId=%s",
- status, groupId, streamId));
+ throw new BusinessException(String.format("stream status=%s not support restart stream"
+ + " for groupId=%s streamId=%s", status, groupId, streamId));
}
+
StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
GroupOperateType.RESTART);
ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
@@ -199,34 +197,28 @@ public class InlongStreamProcessService {
*/
public boolean deleteProcess(String groupId, String streamId, String operator, boolean sync) {
log.info("begin to delete stream process for groupId={} streamId={}", groupId, streamId);
+
InlongGroupInfo groupInfo = groupService.get(groupId);
- if (groupInfo == null) {
- throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
- }
+ Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
- if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
- && groupStatus != GroupStatus.RESTARTED
- && groupStatus != GroupStatus.SUSPENDED
- && groupStatus != GroupStatus.DELETING) {
- throw new BusinessException(
- String.format("group status=%s not support delete stream for groupId=%s", groupStatus, groupId));
+ if (GroupStatus.notAllowedTransition(groupStatus, GroupStatus.DELETING)) {
+ throw new BusinessException(String.format("group status=%s not support delete stream"
+ + " for groupId=%s", groupStatus, groupId));
}
+
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
- if (streamInfo == null) {
- throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
- }
+ Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
if (status == StreamStatus.DELETED || status == StreamStatus.DELETING) {
- log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
+ log.warn("groupId={}, streamId={} is already in {}", groupId, streamId, status);
return true;
}
- if (status == StreamStatus.CONFIG_ING
- || status == StreamStatus.RESTARTING
- || status == StreamStatus.SUSPENDING) {
- throw new BusinessException(
- String.format("stream status=%s not support delete stream for groupId=%s streamId=%s",
- status, groupId, streamId));
+
+ if (StreamStatus.notAllowedDelete(status)) {
+ throw new BusinessException(String.format("stream status=%s not support delete stream"
+ + " for groupId=%s streamId=%s", status, groupId, streamId));
}
+
StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
GroupOperateType.DELETE);
ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 043df26df..d8a49c30b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -389,7 +389,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
}
for (InlongStreamEntity entity : entityList) {
- entity.setIsDeleted(1);
+ entity.setIsDeleted(entity.getId());
entity.setModifier(operator);
int rowCount = streamMapper.updateByIdentifierSelective(entity);