You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/07 05:58:00 UTC

[inlong] branch master updated: [INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c79889b4 [INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807)
0c79889b4 is described below

commit 0c79889b4fa1262a111f5d66bcef7468bcbd602e
Author: woofyzhao <49...@qq.com>
AuthorDate: Wed Sep 7 13:57:54 2022 +0800

    [INLONG-5799][Manager] Fix the source delete/stop/restart bug (#5807)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../service/listener/source/AbstractSourceOperateListener.java       | 5 +++++
 .../inlong/manager/service/source/StreamSourceServiceImpl.java       | 3 ---
 2 files changed, 5 insertions(+), 3 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index 7f1097e18..c07c86905 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -99,6 +99,11 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
      */
     @SneakyThrows
     public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) {
+        // if a source has sub-sources, it is considered a template source.
+        // template sources do not need to be operated, its sub-sources will be processed in this method later.
+        if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
+            return false;
+        }
         for (int retry = 0; retry < 60; retry++) {
             int status = streamSource.getStatus();
             SourceStatus sourceStatus = SourceStatus.forCode(status);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 3f14d75a8..7ef31f223 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -237,7 +237,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
         SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
@@ -272,7 +271,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.info("begin to restart source by id={}", id);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
         SourceRequest sourceRequest = new SourceRequest();
@@ -290,7 +288,6 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.info("begin to stop source by id={}", id);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         StreamSourceOperator sourceOperator = operatorFactory.getInstance(entity.getSourceType());
         SourceRequest sourceRequest = new SourceRequest();