You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/15 02:34:42 UTC

[inlong] branch master updated: [INLONG-6517][Manager] Support delete sinks when the inlong group was config_failed (#6524)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c9fd9f5c [INLONG-6517][Manager] Support delete sinks when the inlong group was config_failed (#6524)
5c9fd9f5c is described below

commit 5c9fd9f5c15c10fbf51f912e68455c0023b94db3
Author: healchow <he...@gmail.com>
AuthorDate: Tue Nov 15 10:34:34 2022 +0800

    [INLONG-6517][Manager] Support delete sinks when the inlong group was config_failed (#6524)
---
 .../inlong/manager/common/enums/GroupStatus.java   | 62 +++++++++++-----
 .../manager/common/enums/SimpleGroupStatus.java    |  4 +-
 .../inlong/manager/common/enums/SinkStatus.java    |  3 -
 .../inlong/manager/common/enums/StreamStatus.java  | 13 +++-
 .../service/group/InlongGroupServiceImpl.java      | 13 ++--
 .../service/stream/InlongStreamProcessService.java | 84 ++++++++++------------
 .../service/stream/InlongStreamServiceImpl.java    |  2 +-
 7 files changed, 102 insertions(+), 79 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
index e60b898d5..0f836d46c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/GroupStatus.java
@@ -29,7 +29,6 @@ import java.util.Set;
  */
 public enum GroupStatus {
 
-    DRAFT(0, "draft"),
     TO_BE_SUBMIT(100, "waiting for submit"),
     TO_BE_APPROVAL(101, "waiting for approval"),
 
@@ -58,7 +57,6 @@ public enum GroupStatus {
      * Init group finite status automaton
      */
     static {
-        GROUP_STATE_AUTOMATON.put(DRAFT, Sets.newHashSet(DRAFT, TO_BE_SUBMIT, DELETING));
         GROUP_STATE_AUTOMATON.put(TO_BE_SUBMIT, Sets.newHashSet(TO_BE_SUBMIT, TO_BE_APPROVAL, DELETING));
         GROUP_STATE_AUTOMATON.put(TO_BE_APPROVAL, Sets.newHashSet(TO_BE_APPROVAL, APPROVE_REJECTED, APPROVE_PASSED));
 
@@ -106,36 +104,68 @@ public enum GroupStatus {
     }
 
     /**
-     * Checks whether the given status allows the update.
+     * Checks whether the given status allows updating operate.
      */
     public static boolean notAllowedUpdate(GroupStatus status) {
-        return status == GroupStatus.CONFIG_ING || status == GroupStatus.SUSPENDING
-                || status == GroupStatus.RESTARTING || status == GroupStatus.DELETING;
+        return status == GroupStatus.TO_BE_APPROVAL
+                || status == GroupStatus.CONFIG_ING
+                || status == GroupStatus.SUSPENDING
+                || status == GroupStatus.RESTARTING
+                || status == GroupStatus.DELETING;
     }
 
     /**
-     * Checks whether the given status allows the logical delete
+     * Checks whether the given status allows updating the MQ type of inlong group.
      */
-    public static boolean allowedLogicDelete(GroupStatus status) {
-        return status == GroupStatus.DRAFT || status == GroupStatus.TO_BE_SUBMIT
-                || status == GroupStatus.DELETED || status == GroupStatus.FINISH;
+    public static boolean allowedUpdateMQ(GroupStatus status) {
+        return status == GroupStatus.TO_BE_SUBMIT
+                || status == GroupStatus.TO_BE_APPROVAL
+                || status == GroupStatus.APPROVE_REJECTED
+                || status == GroupStatus.CONFIG_FAILED;
     }
 
     /**
-     * Only the {@link GroupStatus#DRAFT} and {@link GroupStatus#TO_BE_SUBMIT} status
-     * allows change the MQ type of inlong group.
+     * Checks whether the given status needs to delete the inlong stream first.
      */
-    public static boolean notAllowedUpdateMQ(GroupStatus status) {
-        return status != GroupStatus.DRAFT && status != GroupStatus.TO_BE_SUBMIT
-                && status != GroupStatus.TO_BE_APPROVAL && status != GroupStatus.APPROVE_REJECTED
-                && status != GroupStatus.CONFIG_FAILED;
+    public static boolean deleteStreamFirst(GroupStatus status) {
+        return status == GroupStatus.APPROVE_PASSED
+                || status == GroupStatus.CONFIG_FAILED
+                || status == GroupStatus.CONFIG_SUCCESSFUL
+                || status == GroupStatus.SUSPENDED
+                || status == GroupStatus.RESTARTED
+                || status == GroupStatus.FINISH;
+    }
+
+    /**
+     * Checks whether the given status allows deleting other infos,
+     * <p/>
+     * If true, will logically delete all related infos, including streams, sources, sinks, etc.
+     */
+    public static boolean allowedDeleteSubInfos(GroupStatus status) {
+        return status == GroupStatus.TO_BE_SUBMIT
+                || status == GroupStatus.APPROVE_REJECTED
+                || status == GroupStatus.DELETED;
+    }
+
+    /**
+     * Checks whether the given status allows suspending operate.
+     */
+    public static boolean allowedSuspend(GroupStatus status) {
+        return status == GroupStatus.CONFIG_SUCCESSFUL
+                || status == GroupStatus.RESTARTED
+                || status == GroupStatus.SUSPENDED
+                || status == GroupStatus.FINISH;
     }
 
     /**
      * Temporary group status, adding, deleting and modifying operations are not allowed
      */
     public static boolean isTempStatus(GroupStatus status) {
-        return status == TO_BE_APPROVAL || status == CONFIG_ING;
+        return status == TO_BE_APPROVAL
+                || status == CONFIG_ING
+                || status == SUSPENDING
+                || status == RESTARTING
+                || status == DELETING;
     }
 
     public Integer getCode() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
index 29edeec89..30dbbacc3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SimpleGroupStatus.java
@@ -36,7 +36,6 @@ public enum SimpleGroupStatus {
     public static SimpleGroupStatus parseStatusByCode(int code) {
         GroupStatus groupStatus = GroupStatus.forCode(code);
         switch (groupStatus) {
-            case DRAFT:
             case TO_BE_SUBMIT:
             case TO_BE_APPROVAL:
                 return CREATE;
@@ -81,7 +80,6 @@ public enum SimpleGroupStatus {
         List<Integer> statusList = new ArrayList<>();
         switch (groupStatus) {
             case CREATE:
-                statusList.add(GroupStatus.DRAFT.getCode());
                 statusList.add(GroupStatus.TO_BE_SUBMIT.getCode());
                 return statusList;
             case OPERATING:
@@ -114,7 +112,7 @@ public enum SimpleGroupStatus {
                 statusList.add(GroupStatus.DELETED.getCode());
                 return statusList;
             default:
-                throw new IllegalArgumentException(String.format("Unsupported status %s for group", status));
+                throw new IllegalArgumentException(String.format("Unsupported status %s for inlong group", status));
         }
     }
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java
index 5d1aaffc6..0328c018f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SinkStatus.java
@@ -22,9 +22,6 @@ package org.apache.inlong.manager.common.enums;
  */
 public enum SinkStatus {
 
-    DRAFT(0, "draft"),
-    DELETED(10, "deleted"),
-
     // Stream sink related status
     NEW(100, "new"),
     CONFIG_ING(110, "in configure"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
index a522e7481..383563db3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
@@ -22,8 +22,6 @@ package org.apache.inlong.manager.common.enums;
  */
 public enum StreamStatus {
 
-    DRAFT(0, "draft"),
-
     NEW(100, "new"),
     CONFIG_ING(110, "in configure"),
     CONFIG_FAILED(120, "configuration failed"),
@@ -47,13 +45,22 @@ public enum StreamStatus {
     }
 
     /**
-     * Checks whether the given status allows the update.
+     * Checks whether the given status allows updating operate.
      */
     public static boolean notAllowedUpdate(StreamStatus status) {
         return status == StreamStatus.CONFIG_ING || status == StreamStatus.SUSPENDING
                 || status == StreamStatus.RESTARTING || status == StreamStatus.DELETING;
     }
 
+    /**
+     * Checks whether the given status allows deleting operate.
+     */
+    public static boolean notAllowedDelete(StreamStatus status) {
+        return status == StreamStatus.CONFIG_ING
+                || status == StreamStatus.RESTARTING
+                || status == StreamStatus.SUSPENDING;
+    }
+
     public static StreamStatus forCode(int code) {
         for (StreamStatus status : values()) {
             if (status.getCode() == code) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index 9c143fd67..4a7a7d56d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -138,7 +138,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         }
 
         // mq type cannot be changed
-        if (!entity.getMqType().equals(request.getMqType()) && GroupStatus.notAllowedUpdateMQ(curStatus)) {
+        if (!entity.getMqType().equals(request.getMqType()) && !GroupStatus.allowedUpdateMQ(curStatus)) {
             String errMsg = String.format("Current status=%s is not allowed to update MQ type", curStatus);
             LOGGER.error(errMsg);
             throw new BusinessException(ErrorCodeEnum.GROUP_UPDATE_NOT_ALLOWED, errMsg);
@@ -154,7 +154,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         String groupId = request.getInlongGroupId();
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         if (entity != null) {
-            LOGGER.error("groupId {} has already exists", groupId);
+            LOGGER.error("groupId={} has already exists", groupId);
             throw new BusinessException(ErrorCodeEnum.GROUP_DUPLICATE);
         }
 
@@ -433,9 +433,9 @@ public class InlongGroupServiceImpl implements InlongGroupService {
             throw new BusinessException(ErrorCodeEnum.GROUP_DELETE_NOT_ALLOWED, errMsg);
         }
 
-        // If the status allowed logic delete, all associated info will be logically deleted.
-        // In other status, you need to delete the related "inlong_stream" first.
-        if (!GroupStatus.allowedLogicDelete(curState)) {
+        // If the status not allowed deleting directly, you need to delete the related "inlong_stream" first,
+        // otherwise, all associated info will be logically deleted.
+        if (GroupStatus.deleteStreamFirst(curState)) {
             int count = streamService.selectCountByGroupId(groupId);
             if (count >= 1) {
                 LOGGER.error("groupId={} have [{}] inlong streams, deleted failed", groupId, count);
@@ -464,8 +464,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
 
         // logically delete the associated extension info
         groupExtMapper.logicDeleteAllByGroupId(groupId);
-
-        if (GroupStatus.allowedLogicDelete(GroupStatus.forCode(entity.getStatus()))) {
+        if (GroupStatus.allowedDeleteSubInfos(GroupStatus.forCode(entity.getStatus()))) {
             streamService.logicDeleteAll(groupId, operator);
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index 43fff8cdd..abc8d79de 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -75,12 +75,13 @@ public class InlongStreamProcessService {
      */
     public boolean startProcess(String groupId, String streamId, String operator, boolean sync) {
         log.info("begin to start stream process for groupId={} streamId={}", groupId, streamId);
+
         InlongGroupInfo groupInfo = groupService.get(groupId);
         Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
         if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
-            throw new BusinessException(
-                    String.format("group status=%s not support start stream for groupId=%s", groupStatus, groupId));
+            throw new BusinessException(String.format("group status=%s not support start stream"
+                    + " for groupId=%s", groupStatus, groupId));
         }
 
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
@@ -90,6 +91,7 @@ public class InlongStreamProcessService {
             log.warn("stream status={}, not need restart for groupId={} streamId={}", status, groupId, streamId);
             return true;
         }
+
         if (StreamStatus.notAllowedUpdate(status)) {
             String errMsg = String.format("stream status=%s not support start stream for groupId=%s streamId=%s",
                     status, groupId, streamId);
@@ -115,31 +117,28 @@ public class InlongStreamProcessService {
      */
     public boolean suspendProcess(String groupId, String streamId, String operator, boolean sync) {
         log.info("begin to suspend stream process for groupId={} streamId={}", groupId, streamId);
+
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        if (groupInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
-        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
-                && groupStatus != GroupStatus.RESTARTED
-                && groupStatus != GroupStatus.SUSPENDED) {
-            throw new BusinessException(
-                    String.format("group status=%s not support suspend stream for groupId=%s", groupStatus, groupId));
+        if (!GroupStatus.allowedSuspend(groupStatus)) {
+            throw new BusinessException(String.format("group status=%s not support suspend stream"
+                    + " for groupId=%s", groupStatus, groupId));
         }
+
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
-        if (streamInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
         if (status == StreamStatus.SUSPENDED || status == StreamStatus.SUSPENDING) {
-            log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
+            log.warn("groupId={}, streamId={} is already in {}", groupId, streamId, status);
             return true;
         }
+
         if (status != StreamStatus.CONFIG_SUCCESSFUL && status != StreamStatus.RESTARTED) {
-            throw new BusinessException(
-                    String.format("stream status=%s not support suspend stream for groupId=%s streamId=%s",
-                            status, groupId, streamId));
+            throw new BusinessException(String.format("stream status=%s not support suspend stream"
+                    + " for groupId=%s streamId=%s", status, groupId, streamId));
         }
+
         StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
                 GroupOperateType.SUSPEND);
         ProcessName processName = ProcessName.SUSPEND_STREAM_RESOURCE;
@@ -158,29 +157,28 @@ public class InlongStreamProcessService {
      */
     public boolean restartProcess(String groupId, String streamId, String operator, boolean sync) {
         log.info("begin to restart stream process for groupId={} streamId={}", groupId, streamId);
+
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        if (groupInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
         if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
             throw new BusinessException(
                     String.format("group status=%s not support restart stream for groupId=%s", groupStatus, groupId));
         }
+
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
-        if (streamInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
         if (status == StreamStatus.RESTARTED || status == StreamStatus.RESTARTING) {
-            log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
+            log.warn("inlong stream was already in {} for groupId={}, streamId={}", status, groupId, streamId);
             return true;
         }
+
         if (status != StreamStatus.SUSPENDED) {
-            throw new BusinessException(
-                    String.format("stream status=%s not support restart stream for groupId=%s streamId=%s",
-                            status, groupId, streamId));
+            throw new BusinessException(String.format("stream status=%s not support restart stream"
+                    + " for groupId=%s streamId=%s", status, groupId, streamId));
         }
+
         StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
                 GroupOperateType.RESTART);
         ProcessName processName = ProcessName.RESTART_STREAM_RESOURCE;
@@ -199,34 +197,28 @@ public class InlongStreamProcessService {
      */
     public boolean deleteProcess(String groupId, String streamId, String operator, boolean sync) {
         log.info("begin to delete stream process for groupId={} streamId={}", groupId, streamId);
+
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        if (groupInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
-        if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL
-                && groupStatus != GroupStatus.RESTARTED
-                && groupStatus != GroupStatus.SUSPENDED
-                && groupStatus != GroupStatus.DELETING) {
-            throw new BusinessException(
-                    String.format("group status=%s not support delete stream for groupId=%s", groupStatus, groupId));
+        if (GroupStatus.notAllowedTransition(groupStatus, GroupStatus.DELETING)) {
+            throw new BusinessException(String.format("group status=%s not support delete stream"
+                    + " for groupId=%s", groupStatus, groupId));
         }
+
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
-        if (streamInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
         if (status == StreamStatus.DELETED || status == StreamStatus.DELETING) {
-            log.warn("GroupId={}, StreamId={} is already in {}", groupId, streamId, status);
+            log.warn("groupId={}, streamId={} is already in {}", groupId, streamId, status);
             return true;
         }
-        if (status == StreamStatus.CONFIG_ING
-                || status == StreamStatus.RESTARTING
-                || status == StreamStatus.SUSPENDING) {
-            throw new BusinessException(
-                    String.format("stream status=%s not support delete stream for groupId=%s streamId=%s",
-                            status, groupId, streamId));
+
+        if (StreamStatus.notAllowedDelete(status)) {
+            throw new BusinessException(String.format("stream status=%s not support delete stream"
+                    + " for groupId=%s streamId=%s", status, groupId, streamId));
         }
+
         StreamResourceProcessForm processForm = StreamResourceProcessForm.getProcessForm(groupInfo, streamInfo,
                 GroupOperateType.DELETE);
         ProcessName processName = ProcessName.DELETE_STREAM_RESOURCE;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index 043df26df..d8a49c30b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -389,7 +389,7 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         }
 
         for (InlongStreamEntity entity : entityList) {
-            entity.setIsDeleted(1);
+            entity.setIsDeleted(entity.getId());
             entity.setModifier(operator);
 
             int rowCount = streamMapper.updateByIdentifierSelective(entity);