You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/09 06:14:03 UTC

[inlong] branch release-1.3.0 updated: [INLONG-5836][Manager] Fix the source status related behavior problems (#5837)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/release-1.3.0 by this push:
     new 872b7de60 [INLONG-5836][Manager] Fix the source status related behavior problems (#5837)
872b7de60 is described below

commit 872b7de6049f73ee8ec795a73c1e585a1f0ef104
Author: woofyzhao <49...@qq.com>
AuthorDate: Fri Sep 9 12:45:51 2022 +0800

    [INLONG-5836][Manager] Fix the source status related behavior problems (#5837)
---
 .../inlong/manager/common/enums/SourceStatus.java  | 54 +++++++++++-----------
 .../dao/mapper/StreamSourceEntityMapper.java       |  2 +-
 .../resources/mappers/StreamSourceEntityMapper.xml |  2 +-
 .../inlong/manager/pojo/source/SourceRequest.java  |  5 ++
 .../service/core/impl/AgentServiceImpl.java        |  6 +--
 .../source/AbstractSourceOperateListener.java      |  9 ++--
 .../listener/source/SourceRestartListener.java     |  6 +++
 .../listener/source/SourceStopListener.java        |  6 +++
 .../service/source/AbstractSourceOperator.java     | 24 ++++++----
 .../service/source/StreamSourceServiceImpl.java    |  7 ++-
 10 files changed, 71 insertions(+), 50 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
index 1561af0ec..5d5182d41 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/SourceStatus.java
@@ -104,33 +104,33 @@ public enum SourceStatus {
         SOURCE_STATE_AUTOMATON.put(SOURCE_FROZEN, Sets.newHashSet(SOURCE_DISABLE, SOURCE_FROZEN, TO_BE_ISSUED_ACTIVE));
 
         // [xxx] bo be issued
-        HashSet<SourceStatus> tobeAdd = Sets.newHashSet(BEEN_ISSUED_ADD);
-        tobeAdd.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAdd);
-        HashSet<SourceStatus> tobeDelete = Sets.newHashSet(BEEN_ISSUED_DELETE);
-        tobeDelete.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDelete));
-        HashSet<SourceStatus> tobeRetry = Sets.newHashSet(BEEN_ISSUED_RETRY);
-        tobeRetry.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetry));
-        HashSet<SourceStatus> tobeBacktrack = Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
-        tobeBacktrack.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrack));
-        HashSet<SourceStatus> tobeFrozen = Sets.newHashSet(BEEN_ISSUED_FROZEN);
-        tobeFrozen.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozen));
-        HashSet<SourceStatus> tobeActive = Sets.newHashSet(BEEN_ISSUED_ACTIVE);
-        tobeActive.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActive));
-        HashSet<SourceStatus> tobeCheck = Sets.newHashSet(BEEN_ISSUED_CHECK);
-        tobeCheck.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheck));
-        HashSet<SourceStatus> tobeRedoMetric = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
-        tobeRedoMetric.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetric));
-        HashSet<SourceStatus> tobeMakeup = Sets.newHashSet(BEEN_ISSUED_MAKEUP);
-        tobeMakeup.addAll(TOBE_ISSUED_SET);
-        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeup));
+        HashSet<SourceStatus> tobeAddNext = Sets.newHashSet(BEEN_ISSUED_ADD, SOURCE_DISABLE);
+        tobeAddNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ADD, tobeAddNext);
+        HashSet<SourceStatus> tobeDeleteNext = Sets.newHashSet(BEEN_ISSUED_DELETE);
+        tobeDeleteNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_DELETE, Sets.newHashSet(tobeDeleteNext));
+        HashSet<SourceStatus> tobeRetryNext = Sets.newHashSet(BEEN_ISSUED_RETRY);
+        tobeRetryNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_RETRY, Sets.newHashSet(tobeRetryNext));
+        HashSet<SourceStatus> tobeBacktrackNext = Sets.newHashSet(BEEN_ISSUED_BACKTRACK);
+        tobeBacktrackNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_BACKTRACK, Sets.newHashSet(tobeBacktrackNext));
+        HashSet<SourceStatus> tobeFrozenNext = Sets.newHashSet(BEEN_ISSUED_FROZEN);
+        tobeFrozenNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_FROZEN, Sets.newHashSet(tobeFrozenNext));
+        HashSet<SourceStatus> tobeActiveNext = Sets.newHashSet(BEEN_ISSUED_ACTIVE);
+        tobeActiveNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_ACTIVE, Sets.newHashSet(tobeActiveNext));
+        HashSet<SourceStatus> tobeCheckNext = Sets.newHashSet(BEEN_ISSUED_CHECK);
+        tobeCheckNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_CHECK, Sets.newHashSet(tobeCheckNext));
+        HashSet<SourceStatus> tobeRedoMetricNext = Sets.newHashSet(BEEN_ISSUED_REDO_METRIC);
+        tobeRedoMetricNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_REDO_METRIC, Sets.newHashSet(tobeRedoMetricNext));
+        HashSet<SourceStatus> tobeMakeupNext = Sets.newHashSet(BEEN_ISSUED_MAKEUP);
+        tobeMakeupNext.addAll(TOBE_ISSUED_SET);
+        SOURCE_STATE_AUTOMATON.put(TO_BE_ISSUED_MAKEUP, Sets.newHashSet(tobeMakeupNext));
 
         // [xxx] been issued
         SOURCE_STATE_AUTOMATON.put(BEEN_ISSUED_ADD, Sets.newHashSet(SOURCE_NORMAL, SOURCE_FAILED));
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
index 3b0144d3d..8cc63e4e3 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSourceEntityMapper.java
@@ -78,7 +78,7 @@ public interface StreamSourceEntityMapper {
      */
     List<StreamSourceEntity> selectByAgentIpOrCluster(@Param("statusList") List<Integer> statusList,
             @Param("sourceTypeList") List<String> sourceTypeList, @Param("agentIp") String agentIp,
-            @Param("clusterName") String clusterName, @Param("limit") int limit);
+            @Param("clusterName") String clusterName);
 
     /**
      * Query the sources with status 20x by the given agent IP and agent UUID.
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
index c1b955f57..07bb4e8d4 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSourceEntityMapper.xml
@@ -222,7 +222,6 @@
                 </foreach>
             </if>
             and (agent_ip = #{agentIp, jdbcType=VARCHAR} or inlong_cluster_name = #{clusterName, jdbcType=VARCHAR})
-            limit #{limit, jdbcType=INTEGER}
         </where>
     </select>
     <select id="selectByStatusAndIp" resultType="org.apache.inlong.manager.dao.entity.StreamSourceEntity">
@@ -372,6 +371,7 @@
             <if test="streamId != null">
                 and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
             </if>
+            and source_type != 'AUTO_PUSH'
         </where>
     </update>
     <update id="updateIpAndUuid">
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
index 42735478e..46b7c338d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/SourceRequest.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.pojo.source;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
@@ -90,4 +91,8 @@ public class SourceRequest {
     @ApiModelProperty("Other properties if needed")
     private Map<String, Object> properties = new LinkedHashMap<>();
 
+    @JsonIgnore
+    @ApiModelProperty("Sub source information of existing agents")
+    private List<SubSourceDTO> subSourceList;
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 91b3f7238..5e56c385d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -215,15 +215,15 @@ public class AgentServiceImpl implements AgentService {
         Preconditions.checkTrue(StringUtils.isNotBlank(agentIp) || StringUtils.isNotBlank(agentClusterName),
                 "both agent ip and cluster name are blank when fetching file task");
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpOrCluster(needAddStatusList,
-                Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName,TASK_FETCH_SIZE * 10);
+                Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName);
         List<DataConfig> fileTasks = Lists.newArrayList();
         for (StreamSourceEntity sourceEntity : sourceEntities) {
             FileSourceDTO fileSourceDTO = FileSourceDTO.getFromJson(sourceEntity.getExtParams());
             final String destIp = sourceEntity.getAgentIp();
             final String destClusterName = sourceEntity.getInlongClusterName();
 
-            // Cluster name is blank
-            if (StringUtils.isNotBlank(destIp) && StringUtils.isBlank(destClusterName)) {
+            // Use ip directly if it is not empty
+            if (StringUtils.isNotBlank(destIp)) {
                 if (destIp.equals(agentIp)) {
                     int op = getOp(sourceEntity.getStatus());
                     int nextStatus = getNextStatus(sourceEntity.getStatus());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
index c07c86905..01eca1ba7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/AbstractSourceOperateListener.java
@@ -99,15 +99,12 @@ public abstract class AbstractSourceOperateListener implements SourceOperateList
      */
     @SneakyThrows
     public boolean checkIfOp(StreamSource streamSource, List<StreamSource> unOperatedSources) {
-        // if a source has sub-sources, it is considered a template source.
-        // template sources do not need to be operated, its sub-sources will be processed in this method later.
-        if (CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
-            return false;
-        }
         for (int retry = 0; retry < 60; retry++) {
             int status = streamSource.getStatus();
             SourceStatus sourceStatus = SourceStatus.forCode(status);
-            if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_FROZEN) {
+            // template sources are filtered and processed in corresponding subclass listeners
+            if (sourceStatus == SourceStatus.SOURCE_NORMAL || sourceStatus == SourceStatus.SOURCE_FROZEN
+                || CollectionUtils.isNotEmpty(streamSource.getSubSourceList())) {
                 return true;
             } else if (sourceStatus == SourceStatus.SOURCE_FAILED || sourceStatus == SourceStatus.SOURCE_DISABLE) {
                 return false;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
index ec7170f48..54229273d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceRestartListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.source;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -47,6 +48,11 @@ public class SourceRestartListener extends AbstractSourceOperateListener {
 
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String operator) {
+        // if a source has sub-sources, it is considered a template source.
+        // template sources do not need to be restarted, its sub-sources will be processed in this method later.
+        if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+            return;
+        }
         streamSourceService.restart(sourceRequest.getId(), operator);
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
index 0ccdbc5b3..7e2aa08f8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/source/SourceStopListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.listener.source;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
@@ -47,6 +48,11 @@ public class SourceStopListener extends AbstractSourceOperateListener {
 
     @Override
     public void operateStreamSource(SourceRequest sourceRequest, String operator) {
+        // if a source has sub-sources, it is considered a template source.
+        // template sources do not need to be stopped, its sub-sources will be processed in this method later.
+        if (CollectionUtils.isNotEmpty(sourceRequest.getSubSourceList())) {
+            return;
+        }
         streamSourceService.stop(sourceRequest.getId(), operator);
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
index 096a57deb..0d4a3f0b3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/AbstractSourceOperator.java
@@ -78,13 +78,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
     @Transactional(rollbackFor = Throwable.class)
     public Integer saveOpt(SourceRequest request, Integer groupStatus, String operator) {
         StreamSourceEntity entity = CommonBeanUtils.copyProperties(request, StreamSourceEntity::new);
-        if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
-            if (request.getSourceType().equals(SourceType.AUTO_PUSH)) {
-                // auto push task needs not be issued to agent
-                entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
-            } else {
-                entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
-            }
+        if (SourceType.AUTO_PUSH.equals(request.getSourceType())) {
+            // auto push task needs not be issued to agent
+            entity.setStatus(SourceStatus.SOURCE_NORMAL.getCode());
+        } else if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
+            entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
         } else {
             entity.setStatus(SourceStatus.SOURCE_NEW.getCode());
         }
@@ -121,6 +119,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
     public void updateOpt(SourceRequest request, Integer groupStatus, String operator) {
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
+
+        if (SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
+            LOGGER.warn("auto push source {} can not be updated", entity.getSourceName());
+            return;
+        }
         if (!SourceStatus.ALLOWED_UPDATE.contains(entity.getStatus())) {
             throw new BusinessException(String.format("source=%s is not allowed to update, "
                     + "please wait until its changed to final status or stop / frozen / delete it firstly", entity));
@@ -132,7 +135,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
         }
 
-        // Source type cannot be changed
+        // source type cannot be changed
         if (!Objects.equals(entity.getSourceType(), request.getSourceType())) {
             throw new BusinessException(String.format("source type=%s cannot change to %s",
                     entity.getSourceType(), request.getSourceType()));
@@ -150,11 +153,11 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
             }
         }
 
-        // Setting updated parameters of stream source entity.
+        // setting updated parameters of stream source entity.
         setTargetEntity(request, entity);
         entity.setModifier(operator);
 
-        // Re-issue task if necessary
+        // re-issue task if necessary
         entity.setPreviousStatus(entity.getStatus());
         if (GroupStatus.forCode(groupStatus).equals(GroupStatus.CONFIG_SUCCESSFUL)) {
             entity.setStatus(SourceStatus.TO_BE_ISSUED_ADD.getCode());
@@ -171,6 +174,7 @@ public abstract class AbstractSourceOperator implements StreamSourceOperator {
                     break;
             }
         }
+
         int rowCount = sourceMapper.updateByPrimaryKeySelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
             LOGGER.warn(errMsg);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index 5f785fe2c..c805ed1f4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -24,6 +24,7 @@ import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SourceStatus;
@@ -237,12 +238,14 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
+        boolean isTemplateSource = CollectionUtils.isNotEmpty(sourceMapper.selectByTemplateId(id));
 
         SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
         SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
-        // if source is frozen|failed|new , delete directly
+        // if source is frozen|failed|new, or if it is a template source or auto push source, delete directly
         if (curStatus == SourceStatus.SOURCE_FROZEN || curStatus == SourceStatus.SOURCE_FAILED
-                || curStatus == SourceStatus.SOURCE_NEW) {
+                || curStatus == SourceStatus.SOURCE_NEW || isTemplateSource
+                || SourceType.AUTO_PUSH.equals(entity.getSourceType())) {
             nextStatus = SourceStatus.SOURCE_DISABLE;
         }
         if (!SourceStatus.isAllowedTransition(curStatus, nextStatus)) {