You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 13:56:09 UTC

[incubator-inlong] branch master updated: [INLONG-696] modify the status of the entities after approval

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e856b1  [INLONG-696] modify the status of the entities after approval
9e856b1 is described below

commit 9e856b19240d24bd1eaed740b769af5fa1d34294
Author: healzhou <he...@tencent.com>
AuthorDate: Fri Jul 9 21:40:33 2021 +0800

    [INLONG-696] modify the status of the entities after approval
---
 .../manager/common/enums/BizErrorCodeEnum.java     |  5 --
 .../inlong/manager/common/enums/EntityStatus.java  | 76 +++++++++-------------
 .../pojo/datastorage/BaseStorageClusterInfo.java   | 34 ----------
 inlong-manager/manager-dao/pom.xml                 | 14 ++++
 .../manager/dao/mapper/DataStreamEntityMapper.java |  3 +
 .../resources/mappers/DataStreamEntityMapper.xml   | 13 +++-
 .../resources/mappers/StorageHiveEntityMapper.xml  |  6 +-
 inlong-manager/manager-service/pom.xml             | 19 ++++++
 .../manager/service/core/BusinessService.java      |  2 +-
 .../manager/service/core/DataStreamService.java    | 12 ++++
 .../core/impl/BusinessProcessOperation.java        |  2 +-
 .../service/core/impl/BusinessServiceImpl.java     | 26 ++++----
 .../service/core/impl/DataStreamServiceImpl.java   | 19 ++++--
 .../service/core/impl/StorageHiveOperation.java    |  2 +-
 .../service/core/impl/StorageServiceImpl.java      |  4 +-
 .../service/thirdpart/hive/HiveTableOperator.java  |  7 +-
 .../sort/PushHiveConfigToSortEventListener.java    | 52 ++++++++-------
 .../CreateResourceWorkflowDefinition.java          | 20 +++---
 .../newbusiness/NewBusinessApproveForm.java        | 14 ++--
 .../newbusiness/NewBusinessWorkflowDefinition.java | 22 +++----
 ...ener.java => ApproveCancelProcessListener.java} |  4 +-
 ...kListener.java => ApprovePassTaskListener.java} | 10 +--
 ...ener.java => ApproveRejectProcessListener.java} |  4 +-
 .../listener/CompleteProcessListener.java          | 66 -------------------
 .../CreateResourceCompleteProcessListener.java     | 14 +++-
 .../CreateResourceFailedProcessListener.java       |  9 ++-
 .../NewConsumptionWorkflowDefinition.java          | 24 +++----
 ...er.java => ConsumptionApproveTaskListener.java} |  2 +-
 ....java => ConsumptionCancelProcessListener.java} |  4 +-
 ...ava => ConsumptionCompleteProcessListener.java} |  2 +-
 ....java => ConsumptionRejectProcessListener.java} | 12 +---
 .../SingleStreamCompleteProcessListener.java       | 13 +++-
 .../SingleStreamFailedProcessListener.java         | 14 +++-
 .../manager/workflow/core/TransactionHelper.java   | 11 ++--
 inlong-manager/pom.xml                             |  6 ++
 35 files changed, 258 insertions(+), 289 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
index 3084971..0955664 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
@@ -38,7 +38,6 @@ public enum BizErrorCodeEnum {
 
     CLUSTER_NOT_FOUND(1101, "Cluster information does not exist"),
 
-
     DATA_STREAM_NOT_FOUND(1201, "Data stream does not exist/no operation permission"),
     DATA_STREAM_ID_DUPLICATE(1202, "The current business has a data stream with the same ID"),
     DATA_STREAM_OPT_NOT_ALLOWED(1203,
@@ -72,10 +71,6 @@ public enum BizErrorCodeEnum {
     STORAGE_HIVE_FIELD_SAVE_FAILED(1404, "Failed to save/update HIVE data storage field"),
     STORAGE_OPT_NOT_ALLOWED(1405,
             "The current business status does not allow adding/modifying/deleting data storage information"),
-    STORAGE_CLUSTER_UPDATE_NOT_ALLOWED(1406,
-            "Current business status does not allow modification of storage cluster information"),
-    STORAGE_APP_GROUP_UPDATE_NOT_ALLOWED(1407,
-            "Current business status does not allow modification of storage application group information"),
     STORAGE_DB_NAME_UPDATE_NOT_ALLOWED(1408,
             "The current business status does not allow modification of the storage target database name"),
     STORAGE_TB_NAME_UPDATE_NOT_ALLOWED(1409,
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java
index 3a2dce0..fb3ed22 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/EntityStatus.java
@@ -36,25 +36,25 @@ public enum EntityStatus {
     DELETED(40, "deleted"),
 
     // Business related status
-    BIZ_WAIT_APPLYING(100, "waiting to applying"),
-    BIZ_WAIT_APPROVE(101, "waiting to approve"),
-    BIZ_APPROVE_REJECT(102, "approval reject"),
-    BIZ_APPROVE_PASS(103, "approval pass"),
-    BIZ_CONFIG_ING(110, "configuring"),
-    BIZ_CONFIG_FAILURE(120, "failed to config"),
-    BIZ_CONFIG_SUCCESS(130, "successfully config"),
+    BIZ_WAIT_SUBMIT(100, "waiting for submit"),
+    BIZ_WAIT_APPROVAL(101, "waiting for approval"),
+    BIZ_APPROVE_REJECTED(102, "approval rejected"),
+    BIZ_APPROVE_PASSED(103, "approval passed"),
+    BIZ_CONFIG_ING(110, "in configure"),
+    BIZ_CONFIG_FAILED(120, "configuration failed"),
+    BIZ_CONFIG_SUCCESSFUL(130, "configuration successful"),
 
     // Data stream related status
     DATA_STREAM_NEW(100, "new"),
-    DATA_STREAM_CONFIG_ING(110, "configuring"),
-    DATA_STREAM_CONFIG_FAILURE(120, "failed to config"),
-    DATA_STREAM_CONFIG_SUCCESS(130, "successfully config"),
+    DATA_STREAM_CONFIG_ING(110, "in configure"),
+    DATA_STREAM_CONFIG_FAILED(120, "configuration failed"),
+    DATA_STREAM_CONFIG_SUCCESSFUL(130, "configuration successful"),
 
     // Data storage related status
     DATA_STORAGE_NEW(100, "new"),
-    DATA_STORAGE_CONFIG_ING(110, "configuring"),
-    DATA_STORAGE_CONFIG_FAILURE(120, "failed to config"),
-    DATA_STORAGE_CONFIG_SUCCESS(130, "successfully config"),
+    DATA_STORAGE_CONFIG_ING(110, "in configure"),
+    DATA_STORAGE_CONFIG_FAILED(120, "configuration failed"),
+    DATA_STORAGE_CONFIG_SUCCESSFUL(130, "configuration successful"),
 
     // Data source related status
     DATA_RESOURCE_NEW(200, "new"),
@@ -67,69 +67,51 @@ public enum EntityStatus {
     AGENT_WAIT_DELETE(204, "wait delete"),
     AGENT_WAIT_UPDATE(205, "wait update"),
 
-    AGENT_ISSUED_CREATE(300, "created and issued"),
-    AGENT_ISSUED_STOP(302, "stop has been issued"),
-    AGENT_ISSUED_START(303, "start has been issued"),
-    AGENT_ISSUED_DELETE(304, "deletion has been issued"),
-    AGENT_ISSUED_UPDATE(305, "modification has been issued"),
-
-    DEPLOY_WAIT(311, "waiting to change deployment"),
-    DEPLOY_ING(312, "deploying"),
-    DEPLOY_FAILURE(313, "deployment failed"),
-    DEPLOY_SUCCESS(314, "deployed successfully"),
-
-    ISSUE_WAIT(321, "waiting to issue"),
-    ISSUE_ING(322, "issuing"),
-    ISSUE_FAILURE(323, "failed to issue"),
-    ISSUE_SUCCESS(324, "successfully issued"),
     ;
 
     /**
      * The status of the business that can initiate the approval process:
-     * <p/>[BIZ_WAIT_APPLYING] [BIZ_APPROVE_REJECT] [BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
+     * <p/>[BIZ_WAIT_SUBMIT] [BIZ_APPROVE_REJECTED] [BIZ_CONFIG_FAILED] [BIZ_CONFIG_SUCCESSFUL]
      */
     public static final List<Integer> ALLOW_START_WORKFLOW_STATUS = Arrays.asList(
-            BIZ_WAIT_APPLYING.getCode(), BIZ_APPROVE_REJECT.getCode(), BIZ_CONFIG_FAILURE.getCode(),
-            BIZ_CONFIG_SUCCESS.getCode());
+            BIZ_WAIT_SUBMIT.getCode(), BIZ_APPROVE_REJECTED.getCode(), BIZ_CONFIG_FAILED.getCode(),
+            BIZ_CONFIG_SUCCESSFUL.getCode());
 
     /**
      * The status of the business that can be modified:
-     * <p/>[DRAFT] [BIZ_WAIT_APPLYING] [BIZ_APPROVE_REJECT] [BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
+     * <p/>[DRAFT] [BIZ_WAIT_SUBMIT] [BIZ_APPROVE_REJECTED] [BIZ_CONFIG_FAILED] [BIZ_CONFIG_SUCCESSFUL]
      * <p/>[BIZ_CONFIG_ING] status cannot be modified
      */
     public static final List<Integer> ALLOW_UPDATE_BIZ_STATUS = Arrays.asList(
-            DRAFT.getCode(), BIZ_WAIT_APPLYING.getCode(), BIZ_APPROVE_REJECT.getCode(),
-            BIZ_CONFIG_FAILURE.getCode(), BIZ_CONFIG_SUCCESS.getCode());
+            DRAFT.getCode(), BIZ_WAIT_SUBMIT.getCode(), BIZ_APPROVE_REJECTED.getCode(),
+            BIZ_CONFIG_FAILED.getCode(), BIZ_CONFIG_SUCCESSFUL.getCode());
 
     /**
-     * The status of the service that can be deleted:
-     * <p/>[DRAFT] [BIZ_WAIT_APPLYING] [BIZ_APPROVE_REJECT] [BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
-     * <p/>[BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS] status cannot be deleted
+     * The status of the service that can be deleted - all status
+     * <p/>[DRAFT] [BIZ_WAIT_SUBMIT] [BIZ_APPROVE_REJECTED] [BIZ_CONFIG_ING] [BIZ_CONFIG_FAILED] [BIZ_CONFIG_SUCCESSFUL]
+     * <p/>[BIZ_WAIT_APPROVAL] [BIZ_APPROVE_PASSED] status cannot be deleted
      */
     public static final List<Integer> ALLOW_DELETE_BIZ_STATUS = Arrays.asList(
-            DRAFT.getCode(), BIZ_WAIT_APPLYING.getCode(), BIZ_APPROVE_REJECT.getCode(),
-            BIZ_CONFIG_FAILURE.getCode(), BIZ_CONFIG_SUCCESS.getCode());
+            DRAFT.getCode(), BIZ_WAIT_SUBMIT.getCode(), BIZ_APPROVE_REJECTED.getCode(),
+            BIZ_CONFIG_ING.getCode(), BIZ_CONFIG_FAILED.getCode(), BIZ_CONFIG_SUCCESSFUL.getCode());
 
     /**
      * The business can cascade to delete the status of the associated data:
-     * <p/>[DRAFT] [BIZ_WAIT_APPLYING]
      */
     public static final List<Integer> ALLOW_DELETE_BIZ_CASCADE_STATUS = Arrays.asList(
-            DRAFT.getCode(), BIZ_WAIT_APPLYING.getCode());
+            DRAFT.getCode(), BIZ_WAIT_SUBMIT.getCode());
 
     /**
-     * Status of business approval:
-     * <p/>[BIZ_CONFIG_FAILURE] [BIZ_CONFIG_SUCCESS]
+     * Status of business approval
      */
     public static final List<Integer> BIZ_APPROVE_PASS_STATUS = Arrays.asList(
-            BIZ_CONFIG_FAILURE.getCode(), BIZ_CONFIG_SUCCESS.getCode());
+            BIZ_CONFIG_FAILED.getCode(), BIZ_CONFIG_SUCCESSFUL.getCode());
 
     /**
-     * Temporary business status, adding, deleting and modifying operations are not allowed:
-     * <p/>[BIZ_WAIT_APPROVE] [BIZ_CONFIG_ING]
+     * Temporary business status, adding, deleting and modifying operations are not allowed
      */
     public static final List<Integer> BIZ_TEMP_STATUS = Arrays.asList(
-            BIZ_WAIT_APPROVE.getCode(), BIZ_CONFIG_ING.getCode());
+            BIZ_WAIT_APPROVAL.getCode(), BIZ_CONFIG_ING.getCode());
 
     private final Integer code;
     private final String description;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageClusterInfo.java
deleted file mode 100644
index 225cc3b..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageClusterInfo.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.datastorage;
-
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Basic cluster information
- */
-@Data
-public class BaseStorageClusterInfo {
-
-    private Integer id;
-
-    @ApiModelProperty("Cluster name")
-    private String name;
-
-}
diff --git a/inlong-manager/manager-dao/pom.xml b/inlong-manager/manager-dao/pom.xml
index 56c46fc..52d774e 100644
--- a/inlong-manager/manager-dao/pom.xml
+++ b/inlong-manager/manager-dao/pom.xml
@@ -129,6 +129,20 @@
                 </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>hive-llap-tez</artifactId>
+                    <groupId>org.apache.hive</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>hive-vector-code-gen</artifactId>
+                    <groupId>org.apache.hive</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
index e50a3d8..7e74009 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
@@ -80,4 +80,7 @@ public interface DataStreamEntityMapper {
      */
     int deleteAllByBid(@Param("bid") String bid);
 
+    int updateStatusByIdentifier(@Param("bid") String bid, @Param("dsid") String dsid, @Param("status") Integer status,
+            @Param("modifier") String modifier);
+
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
index 24e7b0b..064f845 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
@@ -44,7 +44,7 @@
     </resultMap>
 
     <resultMap id="dataStreamFullInfo"
-               type="org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig">
+            type="org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig">
         <result column="id" jdbcType="VARCHAR" property="id"/>
         <result column="data_stream_identifier" jdbcType="VARCHAR" property="dataStreamIdentifier"/>
         <result column="name" jdbcType="VARCHAR" property="businessIdentifier"/>
@@ -493,6 +493,17 @@
         and data_stream_identifier = #{dataStreamIdentifier, jdbcType=VARCHAR}
         and is_deleted = 0
     </update>
+    <update id="updateStatusByIdentifier">
+        update data_stream
+        set status = #{status, jdbcType=INTEGER}, modifier = #{modifier, jdbcType=VARCHAR}
+        <where>
+            business_identifier = #{bid, jdbcType=VARCHAR}
+            <if test="dsid != null and dsid != ''">
+                and data_stream_identifier = #{dsid, jdbcType=VARCHAR}
+            </if>
+            and is_deleted = 0
+        </where>
+    </update>
 
     <select id="queryStreamToHiveBaseInfoByBid" resultMap="dataStreamFullInfo">
         SELECT h.id,
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
index ad9a088..43b1316 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
@@ -86,9 +86,7 @@
         from storage_hive
         <where>
             is_deleted = 0
-            <if test="bid != null and bid != ''">
-                and business_identifier = #{bid, jdbcType=VARCHAR}
-            </if>
+            and business_identifier = #{bid, jdbcType=VARCHAR}
             <if test="dsid != null and dsid != ''">
                 and data_stream_identifier = #{dsid, jdbcType=VARCHAR}
             </if>
@@ -432,7 +430,7 @@
             file_format            = #{fileFormat,jdbcType=VARCHAR},
             field_splitter         = #{fieldSplitter,jdbcType=VARCHAR},
             encoding_type          = #{encodingType,jdbcType=VARCHAR},
-            hdfs_default_fs         = #{hdfsDefaultFs,jdbcType=VARCHAR},
+            hdfs_default_fs        = #{hdfsDefaultFs,jdbcType=VARCHAR},
             warehouse_dir          = #{warehouseDir,jdbcType=VARCHAR},
             usage_interval         = #{usageInterval,jdbcType=VARCHAR},
             storage_period         = #{storagePeriod,jdbcType=INTEGER},
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index c60baf1..ce04fab 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -116,6 +116,25 @@
                     <artifactId>slf4j-log4j12</artifactId>
                     <groupId>org.slf4j</groupId>
                 </exclusion>
+                <exclusion>
+                    <artifactId>hive-classification</artifactId>
+                    <groupId>org.apache.hive</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <exclusions>
+                <exclusion>
+                    <artifactId>hive-llap-tez</artifactId>
+                    <groupId>org.apache.hive</groupId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>hive-vector-code-gen</artifactId>
+                    <groupId>org.apache.hive</groupId>
+                </exclusion>
             </exclusions>
         </dependency>
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
index df7756b..40af7c7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
@@ -68,7 +68,7 @@ public interface BusinessService {
      * Modify the status of the specified business
      *
      * @param businessIdentifier Business identifier
-     * @param status Modified state
+     * @param status Modified status
      * @param operator Current operator
      * @return whether succeed
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index 8d19480..8bcf83a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -179,4 +179,16 @@ public interface DataStreamService {
      */
     boolean updateAfterApprove(List<DataStreamApproveInfo> streamApproveList, String operator);
 
+    /**
+     * Update stream status
+     *
+     * @param bid Business identifier
+     * @param dsid Data stream identifier
+     * @param status Modified status
+     * @param operator Edit person's name
+     * @return whether succeed
+     * @apiNote If dsid is null, update all data stream associated with bid
+     */
+    boolean updateStatus(String bid, String dsid, Integer status, String operator);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
index 4fd6f14..090fb25 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
@@ -80,7 +80,7 @@ public class BusinessProcessOperation {
         // Modify business status and other information
         entity.setModifier(operator);
         entity.setModifyTime(new Date());
-        entity.setStatus(EntityStatus.BIZ_WAIT_APPROVE.getCode());
+        entity.setStatus(EntityStatus.BIZ_WAIT_APPROVAL.getCode());
         int success = businessMapper.updateByIdentifierSelective(entity);
         Preconditions.checkTrue(success == 1, "failed to update business during assign and start process");
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index e59621a..5b5931e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -94,8 +94,8 @@ public class BusinessServiceImpl implements BusinessService {
         // Only M0 is currently supported
         entity.setSchemaName(BizConstant.SCHEMA_M0_DAY);
 
-        // After saving, the status is set to [BIZ_WAIT_APPLYING]
-        entity.setStatus(EntityStatus.BIZ_WAIT_APPLYING.getCode());
+        // After saving, the status is set to [BIZ_WAIT_SUBMIT]
+        entity.setStatus(EntityStatus.BIZ_WAIT_SUBMIT.getCode());
 
         entity.setCreator(operator);
         entity.setModifier(operator);
@@ -161,8 +161,8 @@ public class BusinessServiceImpl implements BusinessService {
         this.checkBizCanUpdate(entity, businessInfo);
 
         CommonBeanUtils.copyProperties(businessInfo, entity, true);
-        if (EntityStatus.BIZ_CONFIG_FAILURE.getCode().equals(entity.getStatus())) {
-            entity.setStatus(EntityStatus.BIZ_WAIT_APPLYING.getCode());
+        if (EntityStatus.BIZ_CONFIG_FAILED.getCode().equals(entity.getStatus())) {
+            entity.setStatus(EntityStatus.BIZ_WAIT_SUBMIT.getCode());
         }
         entity.setModifier(operator);
         businessMapper.updateByIdentifierSelective(entity);
@@ -191,7 +191,7 @@ public class BusinessServiceImpl implements BusinessService {
             throw new BusinessException(BizErrorCodeEnum.BUSINESS_UPDATE_NOT_ALLOWED);
         }
 
-        // Non-[draft] status, no bid modification allowed
+        // Non-[DRAFT] status, no bid modification allowed
         boolean updateBid = !EntityStatus.DRAFT.getCode().equals(oldStatus)
                 && !Objects.equals(entity.getBusinessIdentifier(), businessInfo.getBusinessIdentifier());
         if (updateBid) {
@@ -200,7 +200,7 @@ public class BusinessServiceImpl implements BusinessService {
         }
 
         // [Configuration successful] Status, bid and middleware type are not allowed to be modified
-        if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(oldStatus)) {
+        if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(oldStatus)) {
             if (!Objects.equals(entity.getBusinessIdentifier(), businessInfo.getBusinessIdentifier())) {
                 LOGGER.error("current status was not allowed to update business identifier");
                 throw new BusinessException(BizErrorCodeEnum.BUSINESS_BID_UPDATE_NOT_ALLOWED);
@@ -241,17 +241,17 @@ public class BusinessServiceImpl implements BusinessService {
             throw new BusinessException(BizErrorCodeEnum.BUSINESS_DELETE_NOT_ALLOWED);
         }
 
-        // [DRAFT]  [BIZ_WAIT_APPLYING] status, all associated data can be logically deleted directly
+        // [DRAFT] [BIZ_WAIT_SUBMIT] status, all associated data can be logically deleted directly
         if (EntityStatus.ALLOW_DELETE_BIZ_CASCADE_STATUS.contains(entity.getStatus())) {
             // Logically delete data streams, data sources and data storage information
             streamService.logicDeleteAllByBid(entity.getBusinessIdentifier(), operator);
         } else {
-            // In other states, you need to delete the associated "data stream" data before you can delete it
-            // When deleting a data stream, you also need to check whether there is
-            // an associated "data source" and "data storage" under it
+            // In other states, you need to delete the associated "data stream" first.
+            // When deleting a data stream, you also need to check whether there are
+            // some associated "data source" and "data storage"
             int count = streamService.selectCountByBid(bid);
             if (count >= 1) {
-                LOGGER.error("bid={} have [{}] data stream, deleted failed", bid, count);
+                LOGGER.error("bid={} have [{}] data streams, deleted failed", bid, count);
                 throw new BusinessException(BizErrorCodeEnum.BUSINESS_HAS_DATA_STREAM);
             }
         }
@@ -291,9 +291,9 @@ public class BusinessServiceImpl implements BusinessService {
             countVO.setTotalCount(countVO.getTotalCount() + count);
             if (status == EntityStatus.BIZ_CONFIG_ING.getCode()) {
                 countVO.setWaitAssignCount(countVO.getWaitAssignCount() + count);
-            } else if (status == EntityStatus.BIZ_WAIT_APPROVE.getCode()) {
+            } else if (status == EntityStatus.BIZ_WAIT_APPROVAL.getCode()) {
                 countVO.setWaitApproveCount(countVO.getWaitApproveCount() + count);
-            } else if (status == EntityStatus.BIZ_APPROVE_REJECT.getCode()) {
+            } else if (status == EntityStatus.BIZ_APPROVE_REJECTED.getCode()) {
                 countVO.setRejectCount(countVO.getRejectCount() + count);
             }
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index 71b0797..ac83a2c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -621,6 +621,17 @@ public class DataStreamServiceImpl implements DataStreamService {
         return true;
     }
 
+    @Override
+    public boolean updateStatus(String bid, String dsid, Integer status, String operator) {
+        LOGGER.debug("begin to update status by bid={}, dsid={}", bid, dsid);
+
+        // businessMapper.updateStatusByIdentifier(bid, status, operator);
+        streamMapper.updateStatusByIdentifier(bid, dsid, status, operator);
+
+        LOGGER.info("success to update stream after approve");
+        return true;
+    }
+
     /**
      * Update extended information
      * <p/>First physically delete the existing extended information, and then add this batch of extended information
@@ -716,16 +727,16 @@ public class DataStreamServiceImpl implements DataStreamService {
         }
 
         // Fields that are not allowed to be modified when the business [configuration is successful]
-        if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(bizStatus)) {
+        if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(bizStatus)) {
             checkUpdatedFields(streamEntity, streamInfo);
         }
 
         // Business [Waiting to submit] [Approval rejected] [Configuration failed], if there is a
         // data source/data storage, the fields that are not allowed to be modified
         List<Integer> statusList = Arrays.asList(
-                EntityStatus.BIZ_WAIT_APPLYING.getCode(),
-                EntityStatus.BIZ_APPROVE_REJECT.getCode(),
-                EntityStatus.BIZ_CONFIG_FAILURE.getCode());
+                EntityStatus.BIZ_WAIT_SUBMIT.getCode(),
+                EntityStatus.BIZ_APPROVE_REJECTED.getCode(),
+                EntityStatus.BIZ_CONFIG_FAILED.getCode());
         if (statusList.contains(bizStatus)) {
             String bid = streamInfo.getBusinessIdentifier();
             String dsid = streamInfo.getDataStreamIdentifier();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
index d37f394..9bdd6bb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
@@ -295,7 +295,7 @@ public class StorageHiveOperation extends StorageBaseOperation {
 
         // When the business status is [Configuration successful], modification and deletion are not allowed,
         // only adding is allowed, and the order of existing fields cannot be changed
-        if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(bizStatus)) {
+        if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(bizStatus)) {
             List<StorageHiveFieldEntity> existsFieldList = hiveFieldMapper.selectByStorageId(storageId);
             if (existsFieldList.size() > fieldInfoList.size()) {
                 LOGGER.error("current status was not allowed to update hive field");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
index 18043c7..c168f6d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
@@ -87,7 +87,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
 
         // If the business status is [Configuration Successful], then asynchronously initiate
         // the [Single data stream Resource Creation] workflow
-        if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(businessEntity.getStatus())) {
+        if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(businessEntity.getStatus())) {
             super.executorService.execute(new WorkflowStartRunnable(operator, businessEntity, dsid));
         }
 
@@ -204,7 +204,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
 
         // The business status is [Configuration successful], then asynchronously initiate
         // the [Single data stream resource creation] workflow
-        if (EntityStatus.BIZ_CONFIG_SUCCESS.getCode().equals(businessEntity.getStatus())) {
+        if (EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode().equals(businessEntity.getStatus())) {
             super.executorService.execute(new WorkflowStartRunnable(operator, businessEntity, dsid));
         }
         LOGGER.info("success to update storage info");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
index c54772e..d2a4b8f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
@@ -57,7 +57,6 @@ public class HiveTableOperator {
         }
 
         HiveTableQueryBean tableBean = getTableQueryBean(hiveConfig);
-
         try {
             // create database if not exists
             dataSourceService.createDb(tableBean);
@@ -76,10 +75,12 @@ public class HiveTableOperator {
                     dataSourceService.createColumn(tableBean);
                 }
             }
-        } catch (Exception e) {
+            storageService.updateHiveStatusById(hiveConfig.getId(),
+                    EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode(), "create hive table success");
+        } catch (Throwable e) {
             log.error("create hive table error, ", e);
             storageService.updateHiveStatusById(hiveConfig.getId(),
-                    EntityStatus.DATA_STORAGE_CONFIG_FAILURE.getCode(), e.getMessage());
+                    EntityStatus.DATA_STORAGE_CONFIG_FAILED.getCode(), e.getMessage());
             throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
index 99e7024..4cac764 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigToSortEventListener.java
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.core.impl.StorageHiveOperation;
+import org.apache.inlong.manager.service.core.StorageService;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -56,14 +56,11 @@ import lombok.extern.slf4j.Slf4j;
 public class PushHiveConfigToSortEventListener implements TaskEventListener {
 
     @Autowired
+    private StorageService storageService;
+    @Autowired
     private StorageHiveEntityMapper storageHiveMapper;
-
     @Autowired
     private DataStreamService dataStreamService;
-
-    @Autowired
-    private StorageHiveOperation storageHiveOperation;
-
     @Autowired
     private ClusterBean clusterBean;
 
@@ -81,11 +78,14 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
         CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
         BusinessInfo businessInfo = form.getBusinessInfo();
         String bid = businessInfo.getBusinessIdentifier();
+        // if dsid not null, just push the config belongs to the bid and the dsid
+        String dsid = form.getDataStreamIdentifier();
 
-        List<StorageHiveEntity> storageHiveEntities = storageHiveMapper.selectByIdentifier(bid, null);
+        List<StorageHiveEntity> storageHiveEntities = storageHiveMapper.selectByIdentifier(bid, dsid);
         for (StorageHiveEntity hiveEntity : storageHiveEntities) {
-            Integer hiveStorageId = hiveEntity.getId();
-            StorageHiveInfo hiveStorage = (StorageHiveInfo) storageHiveOperation.getHiveStorage(hiveStorageId);
+            Integer storageId = hiveEntity.getId();
+            StorageHiveInfo hiveStorage = (StorageHiveInfo) storageService
+                    .getById(BizConstant.STORAGE_TYPE_HIVE, storageId);
             if (log.isDebugEnabled()) {
                 log.debug("hive storage info: {}", hiveStorage);
             }
@@ -94,17 +94,16 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
             if (log.isDebugEnabled()) {
                 log.debug("try to push hive config to sort: {}", JsonUtils.toJson(dataFlowInfo));
             }
-
             try {
                 String zkUrl = clusterBean.getZkUrl();
                 String zkRoot = clusterBean.getZkRoot();
                 // push data flow info to zk
                 String sortClusterName = clusterBean.getAppName();
-                ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName, hiveStorageId, zkUrl, zkRoot);
+                ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName, storageId, zkUrl, zkRoot);
                 // add storage id to zk
-                ZkTools.addDataFlowToCluster(sortClusterName, hiveStorageId, zkUrl, zkRoot);
+                ZkTools.addDataFlowToCluster(sortClusterName, storageId, zkUrl, zkRoot);
             } catch (Exception e) {
-                log.error("add or update data stream information to zk failed, storageId={} ", hiveStorageId, e);
+                log.error("add or update data stream information to zk failed, storageId={} ", storageId, e);
                 throw new WorkflowListenerException("push hive config to sort failed, reason: " + e.getMessage());
             }
         }
@@ -113,6 +112,21 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
     }
 
     private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveInfo hiveStorage) {
+        Stream<FieldInfo> hiveFields = hiveStorage.getHiveFieldList().stream().map(field -> {
+            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
+            return new FieldInfo(field.getFieldName(), formatInfo);
+        });
+
+        List<FieldInfo> sinkFields = hiveFields.collect(Collectors.toList());
+        FieldInfo partitionFieldInfo = new FieldInfo(hiveStorage.getPrimaryPartition(),
+                new TimestampFormatInfo("MILLIS"));
+        sinkFields.add(partitionFieldInfo);
+
+        String hiveServerUrl = hiveStorage.getJdbcUrl();
+        if (hiveServerUrl != null && !hiveServerUrl.startsWith("jdbc:hive2://")) {
+            hiveServerUrl = "jdbc:hive2://" + hiveServerUrl;
+        }
+
         // dataPath = hdfsUrl + / + warehouseDir + / + dbName + .db/ + tableName
         StringBuilder dataPathBuilder = new StringBuilder();
         String hdfsUrl = hiveStorage.getHdfsDefaultFs();
@@ -138,20 +152,10 @@ public class PushHiveConfigToSortEventListener implements TaskEventListener {
             splitter = hiveStorage.getFieldSplitter().charAt(0);
         }
 
-        Stream<FieldInfo> hiveFields = hiveStorage.getHiveFieldList().stream().map(field -> {
-            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
-            return new FieldInfo(field.getFieldName(), formatInfo);
-        });
-
-        List<FieldInfo> sinkFields = hiveFields.collect(Collectors.toList());
-        FieldInfo partitionFieldInfo = new FieldInfo(hiveStorage.getPrimaryPartition(),
-                new TimestampFormatInfo("MILLIS"));
-        sinkFields.add(partitionFieldInfo);
-
         // encapsulate hive sink
         HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
                 sinkFields.toArray(new FieldInfo[0]),
-                hiveStorage.getJdbcUrl(),
+                hiveServerUrl,
                 hiveStorage.getDbName(),
                 hiveStorage.getTableName(),
                 hiveStorage.getUsername(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
index 5085df6..9c81be9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
@@ -116,21 +116,21 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
         createTubeTopicTask.addListener(createTubeTopicTaskEventListener);
         process.addTask(createTubeTopicTask);
 
-        ServiceTask createTubeConsumerGroupTask = new ServiceTask();
-        createTubeConsumerGroupTask.setSkipResolver(c -> {
+        ServiceTask createConsumerGroupForSortTask = new ServiceTask();
+        createConsumerGroupForSortTask.setSkipResolver(c -> {
             CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
             BusinessInfo businessInfo = form.getBusinessInfo();
             if (BizConstant.MIDDLEWARE_TYPE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
                 return false;
             }
-            log.warn("no need to create tube resource for bid={}", form.getBusinessId());
+            log.warn("no need to create tube resource for bid={}, as the middleware type is {}",
+                    form.getBusinessId(), businessInfo.getMiddlewareType());
             return true;
         });
-        createTubeConsumerGroupTask.setName("createTubeSortConsumerGroup");
-        createTubeConsumerGroupTask.setDisplayName("Create Tube Consumer Group");
-        createTubeConsumerGroupTask.addListener(createTubeConsumeGroupTaskEventListener);
-        process.addTask(createTubeConsumerGroupTask);
-
+        createConsumerGroupForSortTask.setName("createConsumerGroupForSort");
+        createConsumerGroupForSortTask.setDisplayName("Create Consumer Group For Sort");
+        createConsumerGroupForSortTask.addListener(createTubeConsumeGroupTaskEventListener);
+        process.addTask(createConsumerGroupForSortTask);
 
         ServiceTask createHiveTablesTask = new ServiceTask();
         createHiveTablesTask.setSkipResolver(c -> {
@@ -161,8 +161,8 @@ public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
         process.addTask(pushSortConfig);
 
         startEvent.addNext(createTubeTopicTask);
-        createTubeTopicTask.addNext(createTubeConsumerGroupTask);
-        createTubeConsumerGroupTask.addNext(createHiveTablesTask);
+        createTubeTopicTask.addNext(createConsumerGroupForSortTask);
+        createConsumerGroupForSortTask.addNext(createHiveTablesTask);
         createHiveTablesTask.addNext(pushSortConfig);
         pushSortConfig.addNext(endEvent);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
index b3a0efe..f9c68b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
@@ -17,22 +17,18 @@
 
 package org.apache.inlong.manager.service.workflow.newbusiness;
 
-import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
-import org.apache.inlong.manager.service.workflow.BaseWorkflowTaskFormType;
-import org.apache.inlong.manager.workflow.exception.FormValidateException;
-import org.apache.inlong.manager.common.util.Preconditions;
-
 import io.swagger.annotations.ApiModelProperty;
-
 import java.util.List;
-
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.service.workflow.BaseWorkflowTaskFormType;
+import org.apache.inlong.manager.workflow.exception.FormValidateException;
 
 /**
  * The system administrator approves and fills in the form
- *
  */
 @Data
 @EqualsAndHashCode(callSuper = false)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
index 97fcb8c..c403c67 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
@@ -22,10 +22,9 @@ import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterCont
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveTaskListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CancelProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.RejectProcessListener;
+import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApprovePassTaskListener;
+import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveRejectProcessListener;
 import org.apache.inlong.manager.service.workflow.newbusiness.listener.StartCreateResourceProcessListener;
 import org.apache.inlong.manager.workflow.model.definition.EndEvent;
 import org.apache.inlong.manager.workflow.model.definition.Process;
@@ -41,13 +40,11 @@ import org.springframework.stereotype.Component;
 public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private ApproveTaskListener approveTaskListener;
+    private ApprovePassTaskListener approvePassTaskListener;
     @Autowired
-    private CancelProcessListener cancelProcessListener;
+    private ApproveCancelProcessListener approveCancelProcessListener;
     @Autowired
-    private RejectProcessListener rejectProcessListener;
-    @Autowired
-    private CompleteProcessListener completeProcessListener;
+    private ApproveRejectProcessListener approveRejectProcessListener;
     @Autowired
     private StartCreateResourceProcessListener startCreateResourceProcessListener;
     @Autowired
@@ -64,9 +61,8 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
         process.setVersion(1);
 
         // Set up the listener
-        process.addListener(cancelProcessListener);
-        process.addListener(rejectProcessListener);
-        process.addListener(completeProcessListener);
+        process.addListener(approveCancelProcessListener);
+        process.addListener(approveRejectProcessListener);
         // Initiate the process of creating business resources,
         // and set the business status to [Configuration Successful]/[Configuration Failed] according to its completion
         process.addListener(startCreateResourceProcessListener);
@@ -85,7 +81,7 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
         adminUserTask.setDisplayName("System Administrator");
         adminUserTask.setFormClass(NewBusinessApproveForm.class);
         adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
-        adminUserTask.addListener(approveTaskListener);
+        adminUserTask.addListener(approvePassTaskListener);
         process.addTask(adminUserTask);
 
         // Configuration order relationship
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
index 4199ff2..6ad50b2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class CancelProcessListener implements ProcessEventListener {
+public class ApproveCancelProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
@@ -50,7 +50,7 @@ public class CancelProcessListener implements ProcessEventListener {
         // After canceling the approval, the status becomes [Waiting to submit]
         String bid = form.getBusinessId();
         String username = context.getApplicant();
-        businessService.updateStatus(bid, EntityStatus.BIZ_WAIT_APPLYING.getCode(), username);
+        businessService.updateStatus(bid, EntityStatus.BIZ_WAIT_SUBMIT.getCode(), username);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
index 83f09dc..4bef36a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
@@ -17,6 +17,8 @@
 
 package org.apache.inlong.manager.service.workflow.newbusiness.listener;
 
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
 import org.apache.inlong.manager.service.core.BusinessService;
@@ -27,21 +29,15 @@ import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
 import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
 import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import java.util.List;
-
-import lombok.extern.slf4j.Slf4j;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
  * New Service Access-System Administrator Approval Task Event Listener
- *
  */
 @Slf4j
 @Component
-public class ApproveTaskListener implements TaskEventListener {
+public class ApprovePassTaskListener implements TaskEventListener {
 
     @Autowired
     private BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/RejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
similarity index 95%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/RejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
index 50914d4..baa8fdd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/RejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class RejectProcessListener implements ProcessEventListener {
+public class ApproveRejectProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
@@ -50,7 +50,7 @@ public class RejectProcessListener implements ProcessEventListener {
         // after reject, update business status to [BIZ_APPROVE_REJECT]
         String bid = form.getBusinessId();
         String username = context.getApplicant();
-        businessService.updateStatus(bid, EntityStatus.BIZ_APPROVE_REJECT.getCode(), username);
+        businessService.updateStatus(bid, EntityStatus.BIZ_APPROVE_REJECTED.getCode(), username);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CompleteProcessListener.java
deleted file mode 100644
index a909b30..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CompleteProcessListener.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
-
-import org.apache.inlong.manager.common.enums.EntityStatus;
-import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
-import org.apache.inlong.manager.workflow.core.event.ListenerResult;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
-import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
-import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import lombok.extern.slf4j.Slf4j;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Listener after new business approval [process completed]
- *
- */
-@Slf4j
-@Component
-public class CompleteProcessListener implements ProcessEventListener {
-
-    @Autowired
-    private BusinessService businessService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.COMPLETE;
-    }
-
-    /**
-     * After the business approval process is completed, modify the business status to [Configuring]
-     */
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        NewBusinessWorkflowForm form = (NewBusinessWorkflowForm) context.getProcessForm();
-        String bid = form.getBusinessId();
-        String username = context.getApplicant();
-        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_ING.getCode(), username);
-        return ListenerResult.success();
-    }
-
-    @Override
-    public boolean async() {
-        return false;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
index 261b8df..0ce2509 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newbusiness.listener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class CreateResourceCompleteProcessListener implements ProcessEventListen
 
     @Autowired
     private BusinessService businessService;
+    @Autowired
+    private DataStreamService dataStreamService;
 
     @Override
     public ProcessEvent event() {
@@ -45,16 +48,21 @@ public class CreateResourceCompleteProcessListener implements ProcessEventListen
     }
 
     /**
-     * After the process of creating business resources is completed, modify the business status to
-     * [Configuration Successful] [Configuration Failed]
+     * After the process of creating business resources is completed, modify the status of business and all data stream
+     * belong to this business to [Configuration Successful] [Configuration Failed]
      * <p/>{@link CreateResourceFailedProcessListener#listen}
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+
         String bid = form.getBusinessId();
         String username = context.getApplicant();
-        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESS.getCode(), username);
+        // update business status
+        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode(), username);
+        // update data stream status
+        dataStreamService.updateStatus(bid, null, EntityStatus.DATA_STREAM_CONFIG_SUCCESSFUL.getCode(), username);
+
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
index 06f503e..d81f545 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newbusiness.listener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class CreateResourceFailedProcessListener implements ProcessEventListener
 
     @Autowired
     private BusinessService businessService;
+    @Autowired
+    private DataStreamService dataStreamService;
 
     @Override
     public ProcessEvent event() {
@@ -54,7 +57,11 @@ public class CreateResourceFailedProcessListener implements ProcessEventListener
         CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
         String bid = form.getBusinessId();
         String username = context.getApplicant();
-        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILURE.getCode(), username);
+
+        // update business status
+        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILED.getCode(), username);
+        // update data stream status
+        dataStreamService.updateStatus(bid, null, EntityStatus.DATA_STREAM_CONFIG_FAILED.getCode(), username);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
index c5da39e..fe809ae 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
@@ -28,10 +28,10 @@ import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ApproveTaskEventListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.CancelProcessEventListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.CompleteProcessEventListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.RejectProcessEventListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionApproveTaskListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCompleteProcessListener;
+import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionRejectProcessListener;
 import org.apache.inlong.manager.workflow.model.WorkflowContext;
 import org.apache.inlong.manager.workflow.model.definition.EndEvent;
 import org.apache.inlong.manager.workflow.model.definition.Process;
@@ -50,16 +50,16 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     public static final String UT_BIZ_OWNER_NAME = "ut_biz_owner";
 
     @Autowired
-    private CompleteProcessEventListener completeProcessEventListener;
+    private ConsumptionCompleteProcessListener consumptionCompleteProcessListener;
 
     @Autowired
-    private ApproveTaskEventListener approveTaskEventListener;
+    private ConsumptionApproveTaskListener consumptionApproveTaskListener;
 
     @Autowired
-    private RejectProcessEventListener rejectProcessEventListener;
+    private ConsumptionRejectProcessListener consumptionRejectProcessListener;
 
     @Autowired
-    private CancelProcessEventListener cancelProcessEventListener;
+    private ConsumptionCancelProcessListener consumptionCancelProcessListener;
 
     @Autowired
     private WorkflowApproverService workflowApproverService;
@@ -103,7 +103,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
         adminUserTask.setDisplayName("System Administrator");
         adminUserTask.setFormClass(NewConsumptionApproveForm.class);
         adminUserTask.setApproverAssign(this::adminUserTaskApprover);
-        adminUserTask.addListener(approveTaskEventListener);
+        adminUserTask.addListener(consumptionApproveTaskListener);
         process.addTask(adminUserTask);
 
         // Set order relationship
@@ -112,9 +112,9 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
         adminUserTask.addNext(endEvent);
 
         // Set up the listener
-        process.addListener(completeProcessEventListener);
-        process.addListener(rejectProcessEventListener);
-        process.addListener(cancelProcessEventListener);
+        process.addListener(consumptionCompleteProcessListener);
+        process.addListener(consumptionRejectProcessListener);
+        process.addListener(consumptionCancelProcessListener);
 
         return process;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ApproveTaskEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ApproveTaskEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
index ab15916..3417151 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ApproveTaskEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class ApproveTaskEventListener implements TaskEventListener {
+public class ConsumptionApproveTaskListener implements TaskEventListener {
 
     @Autowired
     private ConsumptionService consumptionService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CancelProcessEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
similarity index 93%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CancelProcessEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
index 0d71d75..e6cd836 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CancelProcessEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
@@ -40,12 +40,12 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class CancelProcessEventListener implements ProcessEventListener {
+public class ConsumptionCancelProcessListener implements ProcessEventListener {
 
     private ConsumptionEntityMapper consumptionEntityMapper;
 
     @Autowired
-    public CancelProcessEventListener(ConsumptionEntityMapper consumptionEntityMapper) {
+    public ConsumptionCancelProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
         this.consumptionEntityMapper = consumptionEntityMapper;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CompleteProcessEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CompleteProcessEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
index 5647ae0..619e5cf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/CompleteProcessEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
@@ -52,7 +52,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class CompleteProcessEventListener implements ProcessEventListener {
+public class ConsumptionCompleteProcessListener implements ProcessEventListener {
 
     @Autowired
     private QueryService queryService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/RejectProcessEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
similarity index 92%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/RejectProcessEventListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
index 744f160..e02ce91 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/RejectProcessEventListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.manager.service.workflow.newconsumption.listener;
 
+import java.util.Date;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
@@ -26,26 +27,19 @@ import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
 import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
 import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import java.util.Date;
-
-import lombok.extern.slf4j.Slf4j;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
  * Added data consumption process rejection event listener
- *
  */
-@Slf4j
 @Component
-public class RejectProcessEventListener implements ProcessEventListener {
+public class ConsumptionRejectProcessListener implements ProcessEventListener {
 
     private ConsumptionEntityMapper consumptionEntityMapper;
 
     @Autowired
-    public RejectProcessEventListener(ConsumptionEntityMapper consumptionEntityMapper) {
+    public ConsumptionRejectProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
         this.consumptionEntityMapper = consumptionEntityMapper;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
index ab9beed..e31524b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newstream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class SingleStreamCompleteProcessListener implements ProcessEventListener
 
     @Autowired
     private BusinessService businessService;
+    @Autowired
+    private DataStreamService dataStreamService;
 
     @Override
     public ProcessEvent event() {
@@ -45,14 +48,20 @@ public class SingleStreamCompleteProcessListener implements ProcessEventListener
     }
 
     /**
-     * The creation process ends normally, and the business status is modified to [BIZ_CONFIG_SUCCESS]
+     * The creation process ends normally, modify the status of business and all data stream
+     * belong to the business to [CONFIG_SUCCESSFUL]
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
         String bid = form.getBusinessId();
+        String dsid = form.getDataStreamIdentifier();
         String username = context.getApplicant();
-        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESS.getCode(), username);
+
+        // update business status
+        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode(), username);
+        // update data stream status
+        dataStreamService.updateStatus(bid, dsid, EntityStatus.DATA_STREAM_CONFIG_SUCCESSFUL.getCode(), username);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
index aba59ee..7305500 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.newstream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.DataStreamService;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
@@ -38,6 +39,8 @@ public class SingleStreamFailedProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
+    @Autowired
+    private DataStreamService dataStreamService;
 
     @Override
     public ProcessEvent event() {
@@ -45,14 +48,21 @@ public class SingleStreamFailedProcessListener implements ProcessEventListener {
     }
 
     /**
-     * An exception occurred in the creation process, and the business status was changed to [BIZ_CONFIG_FAILURE]
+     * The creation process ends abnormally, modify the status of business and all data stream
+     * belong to the business to [CONFIG_FAILED]
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
         String bid = form.getBusinessId();
+        String dsid = form.getDataStreamIdentifier();
         String username = context.getApplicant();
-        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILURE.getCode(), username);
+
+        // update business status
+        businessService.updateStatus(bid, EntityStatus.BIZ_CONFIG_FAILED.getCode(), username);
+        // update data stream status
+        dataStreamService.updateStatus(bid, dsid, EntityStatus.DATA_STREAM_CONFIG_FAILED.getCode(), username);
+
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
index 0ea30d0..1ad90dc 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/TransactionHelper.java
@@ -17,13 +17,10 @@
 
 package org.apache.inlong.manager.workflow.core;
 
-import org.apache.inlong.manager.workflow.exception.WorkflowNoRollbackException;
-import org.apache.inlong.manager.workflow.exception.WorkflowRollbackOnceException;
-
 import java.lang.reflect.UndeclaredThrowableException;
-
 import lombok.extern.slf4j.Slf4j;
-
+import org.apache.inlong.manager.workflow.exception.WorkflowNoRollbackException;
+import org.apache.inlong.manager.workflow.exception.WorkflowRollbackOnceException;
 import org.springframework.transaction.PlatformTransactionManager;
 import org.springframework.transaction.TransactionException;
 import org.springframework.transaction.TransactionStatus;
@@ -33,7 +30,7 @@ import org.springframework.transaction.support.TransactionCallback;
 import org.springframework.util.Assert;
 
 /**
- * Trasaction to Help
+ * Transaction Helper
  */
 @Slf4j
 public class TransactionHelper {
@@ -47,7 +44,7 @@ public class TransactionHelper {
     /**
      * Execute in transaction
      *
-     * @param action              Execution logic
+     * @param action Execution logic
      * @param propagationBehavior Dissemination mechanism
      * @param <T>
      * @return result
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 1c25709..dab6e65 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -242,6 +242,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.apache.hive</groupId>
+                <artifactId>hive-exec</artifactId>
+                <version>${hive.version}</version>
+            </dependency>
+
+            <dependency>
                 <groupId>org.apache.hadoop</groupId>
                 <artifactId>hadoop-common</artifactId>
                 <version>2.6.0</version>