You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/02 06:14:47 UTC

[inlong] branch master updated: [INLONG-5611][Manager] Set the sink status after updating the sink info (#5758)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c7d35a3f2 [INLONG-5611][Manager] Set the sink status after updating the sink info (#5758)
c7d35a3f2 is described below

commit c7d35a3f24ad7ca9097432c1c3429ad1ef13f6a2
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Fri Sep 2 14:14:42 2022 +0800

    [INLONG-5611][Manager] Set the sink status after updating the sink info (#5758)
---
 .../inlong/manager/common/enums/StreamStatus.java  | 21 ++++++++-----
 .../manager/dao/mapper/StreamSinkEntityMapper.java |  4 +--
 .../resources/mappers/StreamSinkEntityMapper.xml   | 33 ++------------------
 .../mappers/StreamSinkFieldEntityMapper.xml        |  4 +--
 .../sink/ck/ClickHouseResourceOperator.java        | 12 ++++----
 .../manager/service/sink/AbstractSinkOperator.java | 10 +++---
 .../manager/service/sink/StreamSinkOperator.java   |  4 ++-
 .../service/sink/StreamSinkServiceImpl.java        | 36 +++++++++++++---------
 .../service/stream/InlongStreamProcessService.java | 26 ++++++++--------
 9 files changed, 67 insertions(+), 83 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
index 151922eee..a522e7481 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/StreamStatus.java
@@ -24,7 +24,6 @@ public enum StreamStatus {
 
     DRAFT(0, "draft"),
 
-    // Stream related status
     NEW(100, "new"),
     CONFIG_ING(110, "in configure"),
     CONFIG_FAILED(120, "configuration failed"),
@@ -47,12 +46,12 @@ public enum StreamStatus {
         this.description = description;
     }
 
-    public Integer getCode() {
-        return code;
-    }
-
-    public String getDescription() {
-        return description;
+    /**
+     * Checks whether the given status allows the update.
+     */
+    public static boolean notAllowedUpdate(StreamStatus status) {
+        return status == StreamStatus.CONFIG_ING || status == StreamStatus.SUSPENDING
+                || status == StreamStatus.RESTARTING || status == StreamStatus.DELETING;
     }
 
     public static StreamStatus forCode(int code) {
@@ -64,4 +63,12 @@ public enum StreamStatus {
         throw new IllegalStateException(String.format("Illegal code=%s for StreamStatus", code));
     }
 
+    public Integer getCode() {
+        return code;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
index 111197bbe..9999371ff 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StreamSinkEntityMapper.java
@@ -129,9 +129,7 @@ public interface StreamSinkEntityMapper {
      */
     List<SortSourceStreamInfo> selectAllStreams();
 
-    int updateByPrimaryKeySelective(StreamSinkEntity record);
-
-    int updateByPrimaryKey(StreamSinkEntity record);
+    int updateByIdSelective(StreamSinkEntity record);
 
     int updateStatus(StreamSinkEntity entity);
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index daacb78d2..bd9b72330 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -370,8 +370,7 @@
         where is_deleted = 0
     </select>
 
-    <update id="updateByPrimaryKeySelective"
-            parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
+    <update id="updateByIdSelective" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
         update stream_sink
         <set>
             <if test="inlongGroupId != null">
@@ -411,17 +410,12 @@
                 operate_log = #{operateLog,jdbcType=VARCHAR},
             </if>
             <if test="status != null">
+                previous_status = status,
                 status = #{status,jdbcType=INTEGER},
             </if>
-            <if test="previousStatus != null">
-                previous_status = #{previousStatus,jdbcType=INTEGER},
-            </if>
             <if test="isDeleted != null">
                 is_deleted = #{isDeleted,jdbcType=INTEGER},
             </if>
-            <if test="creator != null">
-                creator = #{creator,jdbcType=VARCHAR},
-            </if>
             <if test="modifier != null">
                 modifier = #{modifier,jdbcType=VARCHAR},
             </if>
@@ -430,29 +424,6 @@
         where id = #{id,jdbcType=INTEGER}
         and version = #{version,jdbcType=INTEGER}
     </update>
-    <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
-        update stream_sink
-        set inlong_group_id        = #{inlongGroupId,jdbcType=VARCHAR},
-            inlong_stream_id       = #{inlongStreamId,jdbcType=VARCHAR},
-            sink_type              = #{sinkType,jdbcType=VARCHAR},
-            sink_name              = #{sinkName,jdbcType=VARCHAR},
-            description            = #{description,jdbcType=VARCHAR},
-            enable_create_resource = #{enableCreateResource,jdbcType=TINYINT},
-            inlong_cluster_name    = #{inlongClusterName,jdbcType=VARCHAR},
-            data_node_name         = #{dataNodeName,jdbcType=VARCHAR},
-            sort_task_name         = #{sortTaskName,jdbcType=VARCHAR},
-            sort_consumer_group    = #{sortConsumerGroup,jdbcType=VARCHAR},
-            ext_params             = #{extParams,jdbcType=VARCHAR},
-            operate_log            = #{operateLog,jdbcType=VARCHAR},
-            status                 = #{status,jdbcType=INTEGER},
-            previous_status        = #{previousStatus,jdbcType=INTEGER},
-            is_deleted             = #{isDeleted,jdbcType=INTEGER},
-            creator                = #{creator,jdbcType=VARCHAR},
-            modifier               = #{modifier,jdbcType=VARCHAR},
-            version                = #{version,jdbcType=INTEGER} + 1
-        where id = #{id,jdbcType=INTEGER}
-          and version = #{version,jdbcType=INTEGER}
-    </update>
     <update id="updateStatus" parameterType="org.apache.inlong.manager.dao.entity.StreamSinkEntity">
         update stream_sink
         set status          = #{status,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
index ece761b77..052b46556 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkFieldEntityMapper.xml
@@ -69,7 +69,7 @@
     </insert>
     <insert id="insertAll">
         insert into stream_sink_field (
-        id, inlong_group_id,
+        inlong_group_id,
         inlong_stream_id, sink_id,
         sink_type, field_name,
         field_type, field_comment,
@@ -82,7 +82,7 @@
         values
         <foreach collection="list" index="index" item="item" separator=",">
             (
-            #{item.id,jdbcType=INTEGER}, #{item.inlongGroupId,jdbcType=VARCHAR},
+            #{item.inlongGroupId,jdbcType=VARCHAR},
             #{item.inlongStreamId,jdbcType=VARCHAR}, #{item.sinkId,jdbcType=INTEGER},
             #{item.sinkType,jdbcType=VARCHAR}, #{item.fieldName,jdbcType=VARCHAR},
             #{item.fieldType,jdbcType=VARCHAR}, #{item.fieldComment,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
index e4d4ea84f..f8aa955e4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/ck/ClickHouseResourceOperator.java
@@ -19,15 +19,15 @@ package org.apache.inlong.manager.service.resource.sink.ck;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.consts.SinkType;
+import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowException;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
-import org.apache.inlong.manager.pojo.sink.SinkInfo;
-import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
 import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkInfo;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseColumnInfo;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSinkDTO;
+import org.apache.inlong.manager.pojo.sink.ck.ClickHouseTableInfo;
 import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.slf4j.Logger;
@@ -135,7 +135,7 @@ public class ClickHouseResourceOperator implements SinkResourceOperator {
             throw new WorkflowException(errMsg);
         }
 
-        LOGGER.info("success create ClickHouse table for data sind [" + sinkInfo.getId() + "]");
+        LOGGER.info("success create ClickHouse table for sink id [" + sinkInfo.getId() + "]");
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index ac227ca56..e4db70a34 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -110,7 +110,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
     }
 
     @Override
-    public void updateOpt(SinkRequest request, String operator) {
+    public void updateOpt(SinkRequest request, SinkStatus nextStatus, String operator) {
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(request.getId());
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
 
@@ -123,9 +123,11 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
         CommonBeanUtils.copyProperties(request, entity, true);
         setTargetEntity(request, entity);
         entity.setPreviousStatus(entity.getStatus());
-        entity.setStatus(SinkStatus.CONFIG_ING.getCode());
+        if (nextStatus != null) {
+            entity.setStatus(nextStatus.getCode());
+        }
         entity.setModifier(operator);
-        int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+        int rowCount = sinkMapper.updateByIdSelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
             LOGGER.error(errMsg);
             throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
@@ -199,7 +201,7 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
         entity.setStatus(InlongConstants.DELETED_STATUS);
         entity.setIsDeleted(entity.getId());
         entity.setModifier(operator);
-        int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+        int rowCount = sinkMapper.updateByIdSelective(entity);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
             LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
                     entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index e23b6cef2..71ed90842 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.service.sink;
 
 import com.github.pagehelper.Page;
+import org.apache.inlong.manager.common.enums.SinkStatus;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.sink.SinkField;
@@ -74,9 +75,10 @@ public interface StreamSinkOperator {
      * Update the sink info.
      *
      * @param request sink info needs to update
+     * @param nextStatus next status
      * @param operator name of the operator
      */
-    void updateOpt(SinkRequest request, String operator);
+    void updateOpt(SinkRequest request, SinkStatus nextStatus, String operator);
 
     /**
      * Update the sink fields.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 2c537e848..8360dc4df 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -25,12 +25,13 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.enums.SinkStatus;
+import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
 import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
@@ -74,6 +75,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Autowired
     private GroupCheckService groupCheckService;
     @Autowired
+    private InlongStreamEntityMapper streamMapper;
+    @Autowired
     private StreamSinkEntityMapper sinkMapper;
     @Autowired
     private StreamSinkFieldEntityMapper sinkFieldMapper;
@@ -212,7 +215,11 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         String groupId = request.getInlongGroupId();
         String streamId = request.getInlongStreamId();
         String sinkName = request.getSinkName();
-        final InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
+
+        // Check whether the stream exist or not
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        Preconditions.checkNotNull(streamEntity, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
 
         // Check whether the sink name exists with the same groupId and streamId
         List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
@@ -223,24 +230,23 @@ public class StreamSinkServiceImpl implements StreamSinkService {
                 throw new BusinessException(String.format(err, sinkName, groupId, streamId));
             }
         }
-        List<SinkField> fields = request.getSinkFieldList();
-        // Remove id in sinkField when save
-        if (CollectionUtils.isNotEmpty(fields)) {
-            fields.forEach(sinkField -> sinkField.setId(null));
-        }
 
+        SinkStatus nextStatus = null;
+        boolean streamSuccess = StreamStatus.CONFIG_SUCCESSFUL.getCode().equals(streamEntity.getStatus());
+        if (streamSuccess || StreamStatus.CONFIG_FAILED.getCode().equals(streamEntity.getStatus())) {
+            nextStatus = SinkStatus.CONFIG_ING;
+        }
         StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
-        sinkOperator.updateOpt(request, operator);
+        sinkOperator.updateOpt(request, nextStatus, operator);
 
-        // The inlong group status is [Configuration successful], then asynchronously initiate
-        // the [Single inlong stream resource creation] workflow
-        if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
+        // If the stream is [CONFIG_SUCCESSFUL], then asynchronously start the [CREATE_STREAM_RESOURCE] process
+        if (streamSuccess) {
             // To work around the circular reference check we manually instantiate and wire
             if (streamProcessOperation == null) {
                 streamProcessOperation = new InlongStreamProcessService();
                 autowireCapableBeanFactory.autowireBean(streamProcessOperation);
             }
-            streamProcessOperation.startProcess(groupId, streamId, operator, true);
+            streamProcessOperation.startProcess(groupId, streamId, operator, false);
         }
         LOGGER.info("success to update sink info: {}", request);
         return true;
@@ -289,7 +295,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
                 entity.setStatus(InlongConstants.DELETED_STATUS);
                 entity.setIsDeleted(id);
                 entity.setModifier(operator);
-                int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+                int rowCount = sinkMapper.updateByIdSelective(entity);
                 if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
                     LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
                             entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(),
@@ -367,7 +373,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
             entity.setPreviousStatus(entity.getStatus());
             entity.setStatus(status);
             entity.setModifier(operator);
-            int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+            int rowCount = sinkMapper.updateByIdSelective(entity);
             if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
                 LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
                         entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
index b95fdbbc6..06046eceb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamProcessService.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.enums.StreamStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
@@ -70,30 +71,27 @@ public class InlongStreamProcessService {
     public boolean startProcess(String groupId, String streamId, String operator, boolean sync) {
         log.info("begin to start stream process for groupId={} streamId={}", groupId, streamId);
         InlongGroupInfo groupInfo = groupService.get(groupId);
-        if (groupInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(groupInfo, ErrorCodeEnum.GROUP_NOT_FOUND.getMessage());
         GroupStatus groupStatus = GroupStatus.forCode(groupInfo.getStatus());
         if (groupStatus != GroupStatus.CONFIG_SUCCESSFUL && groupStatus != GroupStatus.RESTARTED) {
             throw new BusinessException(
-                    String.format("group status =%s not support start stream for groupId=%s", groupStatus, groupId));
+                    String.format("group status=%s not support start stream for groupId=%s", groupStatus, groupId));
         }
+
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
-        if (streamInfo == null) {
-            throw new BusinessException(ErrorCodeEnum.STREAM_NOT_FOUND);
-        }
+        Preconditions.checkNotNull(streamInfo, ErrorCodeEnum.STREAM_NOT_FOUND.getMessage());
         StreamStatus status = StreamStatus.forCode(streamInfo.getStatus());
         if (status == StreamStatus.CONFIG_ING) {
-            log.warn("stream status={}, no need restart for groupId={}, streamId={}", status, groupId, streamId);
+            log.warn("stream status={}, not need restart for groupId={} streamId={}", status, groupId, streamId);
             return true;
         }
-        // only new, failed, and success status support update
-        if (status != StreamStatus.NEW && status != StreamStatus.CONFIG_FAILED
-                && status != StreamStatus.CONFIG_SUCCESSFUL) {
-            throw new BusinessException(
-                    String.format("stream status=%s not support start stream for groupId=%s streamId=%s",
-                            status, groupId, streamId));
+        if (StreamStatus.notAllowedUpdate(status)) {
+            String errMsg = String.format("stream status=%s not support start stream for groupId=%s streamId=%s",
+                    status, groupId, streamId);
+            log.error(errMsg);
+            throw new BusinessException(errMsg);
         }
+
         StreamResourceProcessForm processForm = genStreamProcessForm(groupInfo, streamInfo, GroupOperateType.INIT);
         ProcessName processName = ProcessName.CREATE_STREAM_RESOURCE;
         if (sync) {