You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/11/15 12:20:55 UTC

[inlong] branch master updated: [INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new e64cbd96c [INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547)
e64cbd96c is described below

commit e64cbd96cf5a8771c3f617531230e26f33508e10
Author: healchow <he...@gmail.com>
AuthorDate: Tue Nov 15 20:20:48 2022 +0800

    [INLONG-6544][Manager] Should update the source status after the inlong stream is completed (#6547)
---
 .../listener/group/InitGroupCompleteListener.java  | 46 ++++++------
 .../stream/InitStreamCompleteListener.java         | 21 +++---
 .../resource/sort/DefaultSortConfigOperator.java   | 21 ------
 .../service/stream/InlongStreamService.java        | 11 ++-
 .../service/stream/InlongStreamServiceImpl.java    | 87 +++++-----------------
 5 files changed, 67 insertions(+), 119 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
index a16562fd8..bd848f996 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/group/InitGroupCompleteListener.java
@@ -72,30 +72,34 @@ public class InitGroupCompleteListener implements ProcessEventListener {
         String groupId = form.getInlongGroupId();
         log.info("begin to execute InitGroupCompleteListener for groupId={}", groupId);
 
-        // update inlong group status and other info
-        InlongGroupInfo groupInfo = form.getGroupInfo();
-        String operator = context.getOperator();
-        groupService.updateStatus(groupId, GroupStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        if (InlongGroupUtils.isBatchTask(form.getGroupInfo())) {
-            groupService.updateStatus(groupId, GroupStatus.FINISH.getCode(), operator);
-        }
-        InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId);
-        InlongGroupRequest updateGroupRequest = groupInfo.genRequest();
-        updateGroupRequest.setVersion(existGroup.getVersion());
-        groupService.update(updateGroupRequest, operator);
+        try {
+            // update inlong group status and other info
+            InlongGroupInfo groupInfo = form.getGroupInfo();
+            String operator = context.getOperator();
+            Integer nextStatus = InlongGroupUtils.isBatchTask(form.getGroupInfo())
+                    ? GroupStatus.FINISH.getCode() : GroupStatus.CONFIG_SUCCESSFUL.getCode();
+            groupService.updateStatus(groupId, nextStatus, operator);
+
+            InlongGroupEntity existGroup = groupMapper.selectByGroupId(groupId);
+            InlongGroupRequest updateGroupRequest = groupInfo.genRequest();
+            updateGroupRequest.setVersion(existGroup.getVersion());
+            groupService.update(updateGroupRequest, operator);
 
-        // update status of other related configs
-        if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) {
-            streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-            if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
-                sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
-            } else {
-                sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+            // update status of other related configs
+            if (InlongConstants.DISABLE_CREATE_RESOURCE.equals(groupInfo.getEnableCreateResource())) {
+                streamService.updateStatus(groupId, null, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+                if (InlongConstants.LIGHTWEIGHT_MODE.equals(groupInfo.getLightweight())) {
+                    sourceService.updateStatus(groupId, null, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+                } else {
+                    sourceService.updateStatus(groupId, null, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+                }
             }
-        }
 
-        log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
-        return ListenerResult.success();
+            log.info("success to execute InitGroupCompleteListener for groupId={}", groupId);
+            return ListenerResult.success();
+        } catch (Exception e) {
+            throw new WorkflowListenerException(e);
+        }
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
index 141635c46..2796bde0b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/stream/InitStreamCompleteListener.java
@@ -61,16 +61,19 @@ public class InitStreamCompleteListener implements ProcessEventListener {
         final String streamId = streamInfo.getInlongStreamId();
         final String operator = context.getOperator();
 
-        // Update status of other related configs
-        streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
-        streamService.update(streamInfo.genRequest(), operator);
-        if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) {
-            sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
-        } else {
-            sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+        try {
+            // Update status of other related configs
+            streamService.updateStatus(groupId, streamId, StreamStatus.CONFIG_SUCCESSFUL.getCode(), operator);
+            streamService.updateWithoutCheck(streamInfo.genRequest(), operator);
+            if (InlongConstants.LIGHTWEIGHT_MODE.equals(form.getGroupInfo().getLightweight())) {
+                sourceService.updateStatus(groupId, streamId, SourceStatus.SOURCE_NORMAL.getCode(), operator);
+            } else {
+                sourceService.updateStatus(groupId, streamId, SourceStatus.TO_BE_ISSUED_ADD.getCode(), operator);
+            }
+            return ListenerResult.success();
+        } catch (Exception e) {
+            throw new WorkflowListenerException(e);
         }
-
-        return ListenerResult.success();
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
index 08ec23226..2664cee1e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/DefaultSortConfigOperator.java
@@ -28,7 +28,6 @@ import org.apache.inlong.manager.pojo.sort.util.LoadNodeUtils;
 import org.apache.inlong.manager.pojo.sort.util.NodeRelationUtils;
 import org.apache.inlong.manager.pojo.sort.util.TransformNodeUtils;
 import org.apache.inlong.manager.pojo.source.StreamSource;
-import org.apache.inlong.manager.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.stream.StreamField;
 import org.apache.inlong.manager.pojo.transform.TransformResponse;
@@ -242,24 +241,4 @@ public class DefaultSortConfigOperator implements SortConfigOperator {
         groupInfo.getExtList().add(extInfo);
     }
 
-    /**
-     * Add config into inlong stream ext info
-     */
-    private void addToStreamExt(List<InlongStreamInfo> streamInfos, String value) {
-        streamInfos.forEach(streamInfo -> {
-            if (streamInfo.getExtList() == null) {
-                streamInfo.setExtList(new ArrayList<>());
-            }
-
-            InlongStreamExtInfo extInfo = new InlongStreamExtInfo();
-            extInfo.setInlongGroupId(streamInfo.getInlongGroupId());
-            extInfo.setInlongStreamId(streamInfo.getInlongStreamId());
-            extInfo.setKeyName(InlongConstants.DATAFLOW);
-            extInfo.setKeyValue(value);
-
-            streamInfo.getExtList().removeIf(ext -> extInfo.getKeyName().equals(ext.getKeyName()));
-            streamInfo.getExtList().add(extInfo);
-        });
-    }
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
index 3d17b3972..5a9b5b144 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamService.java
@@ -94,7 +94,7 @@ public interface InlongStreamService {
     List<InlongStreamBriefInfo> listBriefWithSink(String groupId);
 
     /**
-     * InlongStream info that needs to be modified
+     * Update the InlongStream info
      *
      * @param request inlong stream info that needs to be modified
      * @param operator Edit person's name
@@ -102,6 +102,15 @@ public interface InlongStreamService {
      */
     Boolean update(InlongStreamRequest request, String operator);
 
+    /**
+     * Update the InlongStream - not check the InlongGroup status to which the stream belongs.
+     *
+     * @param request inlong stream info that needs to be modified
+     * @param operator Edit person's name
+     * @return whether succeed
+     */
+    Boolean updateWithoutCheck(InlongStreamRequest request, String operator);
+
     /**
      * Delete the specified inlong stream.
      * <p/>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
index d8a49c30b..93146e278 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java
@@ -59,7 +59,6 @@ import org.springframework.transaction.annotation.Propagation;
 import org.springframework.transaction.annotation.Transactional;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -281,8 +280,8 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         return briefInfoList;
     }
 
-    @Transactional(rollbackFor = Throwable.class)
     @Override
+    @Transactional(rollbackFor = Throwable.class)
     public Boolean update(InlongStreamRequest request, String operator) {
         LOGGER.debug("begin to update inlong stream info={}", request);
         Preconditions.checkNotNull(request, "inlong stream request is empty");
@@ -292,22 +291,30 @@ public class InlongStreamServiceImpl implements InlongStreamService {
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be modified
-        InlongGroupEntity inlongGroupEntity = this.checkGroupStatusIsTemp(groupId);
+        this.checkGroupStatusIsTemp(groupId);
+
+        return this.updateWithoutCheck(request, operator);
+    }
 
-        // Make sure the stream was exists
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Boolean updateWithoutCheck(InlongStreamRequest request, String operator) {
+        LOGGER.debug("begin to update inlong stream without check, request={}", request);
+        // make sure the stream was exists
+        String groupId = request.getInlongGroupId();
+        String streamId = request.getInlongStreamId();
         InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
         if (streamEntity == null) {
             LOGGER.error("inlong stream not found by groupId={}, streamId={}", groupId, streamId);
             throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
         }
-        String errMsg = String.format("stream has already updated with group id=%s, stream id=%s, curVersion=%s",
+
+        String errMsg = String.format("stream has already updated with groupId=%s, streamId=%s, curVersion=%s",
                 streamEntity.getInlongGroupId(), streamEntity.getInlongStreamId(), request.getVersion());
         if (!Objects.equals(streamEntity.getVersion(), request.getVersion())) {
             LOGGER.error(errMsg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
-        // Check whether the current inlong group status supports modification
-        this.doUpdateCheck(inlongGroupEntity.getStatus(), streamEntity, request);
 
         CommonBeanUtils.copyProperties(request, streamEntity, true);
         streamEntity.setModifier(operator);
@@ -316,13 +323,12 @@ public class InlongStreamServiceImpl implements InlongStreamService {
             LOGGER.error(errMsg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
-        // Update field information
+        // update stream fields
         updateField(groupId, streamId, request.getFieldList());
-        // Update extension info
-        List<InlongStreamExtInfo> extInfos = request.getExtList();
-        saveOrUpdateExt(groupId, streamId, extInfos);
+        // update stream extension infos
+        saveOrUpdateExt(groupId, streamId, request.getExtList());
 
-        LOGGER.info("success to update inlong stream for groupId={}", groupId);
+        LOGGER.info("success to update inlong stream without check for groupId={} streamId={}", groupId, streamId);
         return true;
     }
 
@@ -465,7 +471,6 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     @Override
     @Transactional(propagation = Propagation.REQUIRES_NEW)
     public boolean updateStatus(String groupId, String streamId, Integer status, String operator) {
-        LOGGER.debug("begin to update status by groupId={}, streamId={}", groupId, streamId);
         streamMapper.updateStatusByIdentifier(groupId, streamId, status, operator);
         LOGGER.info("success to update stream after approve for groupId=" + groupId + ", streamId=" + streamId);
         return true;
@@ -563,66 +568,14 @@ public class InlongStreamServiceImpl implements InlongStreamService {
     private InlongGroupEntity checkGroupStatusIsTemp(String groupId) {
         InlongGroupEntity entity = groupMapper.selectByGroupId(groupId);
         Preconditions.checkNotNull(entity, "groupId is invalid");
-        // Add/modify/delete is not allowed under certain inlong group status
+        // Add/modify/delete is not allowed under temporary inlong group status
         GroupStatus curState = GroupStatus.forCode(entity.getStatus());
         if (GroupStatus.isTempStatus(curState)) {
-            LOGGER.error("inlong group status was not allowed to add/update/delete inlong stream");
+            LOGGER.error("inlong groupId={} status={} was not allowed to add/update/delete stream", groupId, curState);
             throw new BusinessException(ErrorCodeEnum.STREAM_OPT_NOT_ALLOWED);
         }
 
         return entity;
     }
 
-    /**
-     * Verify the fields that cannot be modified in the current inlong group status
-     *
-     * @param groupStatus Inlong group status
-     * @param streamEntity Original inlong stream entity
-     * @param request New inlong stream information
-     */
-    private void doUpdateCheck(Integer groupStatus, InlongStreamEntity streamEntity, InlongStreamRequest request) {
-        if (streamEntity == null || request == null) {
-            return;
-        }
-
-        // Fields that are not allowed to be modified when the inlong group [configuration is successful]
-        if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupStatus)) {
-            checkUpdatedFields(streamEntity, request);
-        }
-
-        // Inlong group [Waiting to submit] [Approval rejected] [Configuration failed], if there is a
-        // stream source/stream sink, the fields that are not allowed to be modified
-        List<Integer> statusList = Arrays.asList(
-                GroupStatus.TO_BE_SUBMIT.getCode(),
-                GroupStatus.APPROVE_REJECTED.getCode(),
-                GroupStatus.CONFIG_FAILED.getCode());
-        if (statusList.contains(groupStatus)) {
-            String groupId = request.getInlongGroupId();
-            String streamId = request.getInlongStreamId();
-            // Whether there is undeleted stream source and sink
-            int sourceCount = sourceService.getCount(groupId, streamId);
-            int sinkCount = sinkService.getCount(groupId, streamId);
-            if (sourceCount > 0 || sinkCount > 0) {
-                checkUpdatedFields(streamEntity, request);
-            }
-        }
-    }
-
-    /**
-     * Check that groupId, streamId  are not allowed to be modified
-     */
-    private void checkUpdatedFields(InlongStreamEntity streamEntity, InlongStreamRequest request) {
-        String newGroupId = request.getInlongGroupId();
-        if (newGroupId != null && !newGroupId.equals(streamEntity.getInlongGroupId())) {
-            LOGGER.error("current status was not allowed to update inlong group id");
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
-        }
-
-        String newStreamId = request.getInlongStreamId();
-        if (newStreamId != null && !newStreamId.equals(streamEntity.getInlongStreamId())) {
-            LOGGER.error("current status was not allowed to update inlong stream id");
-            throw new BusinessException(ErrorCodeEnum.STREAM_ID_UPDATE_NOT_ALLOWED);
-        }
-    }
-
 }