You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/02 02:19:39 UTC

[incubator-inlong] branch master updated: [INLONG-1817][Feature][InLong-Manager] Workflow supports data stream for Pulsar (#1868)

This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 093d71b  [INLONG-1817][Feature][InLong-Manager] Workflow supports data stream for Pulsar (#1868)
093d71b is described below

commit 093d71b00e925a9e1ebfbff8d814345a2f9f87c6
Author: healchow <he...@gmail.com>
AuthorDate: Thu Dec 2 10:19:35 2021 +0800

    [INLONG-1817][Feature][InLong-Manager] Workflow supports data stream for Pulsar (#1868)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 inlong-manager/manager-common/pom.xml              |  12 +
 .../manager/common/pojo/business/BusinessInfo.java |  14 +-
 .../inlong/manager/dao/entity/BusinessEntity.java  |   2 +
 .../dao/mapper/ConsumptionEntityMapper.java        |   4 +-
 .../src/main/resources/generatorConfig.xml         |   2 +-
 .../resources/mappers/BusinessEntityMapper.xml     |  74 +++--
 .../test/resources/sql/apache_inlong_manager.sql   |  44 +--
 inlong-manager/manager-service/pom.xml             |  21 ++
 .../core/impl/BusinessProcessOperation.java        |   2 +-
 .../service/core/impl/ConsumptionServiceImpl.java  |   3 +-
 .../service/core/impl/StorageBaseOperation.java    |  14 +-
 ....java => CreateHiveTableForStreamListener.java} |   6 +-
 ...mListener.java => CreateHiveTableListener.java} |   6 +-
 .../mq/CreatePulsarGroupForStreamTaskListener.java | 136 ++++++++
 .../mq/CreatePulsarGroupTaskListener.java          | 128 +++++++
 .../mq/CreatePulsarResourceTaskListener.java       | 131 ++++++++
 .../mq/CreatePulsarTopicForStreamTaskListener.java | 114 +++++++
 ...tener.java => CreateTubeGroupTaskListener.java} |   6 +-
 .../thirdpart/mq/CreateTubeTopicTaskListener.java  |   4 +-
 .../service/thirdpart/mq/PulsarOptService.java     |  47 +++
 .../service/thirdpart/mq/PulsarOptServiceImpl.java | 250 ++++++++++++++
 .../service/thirdpart/mq/util/PulsarUtils.java     |  50 +++
 .../thirdpart/sort/PushHiveConfigTaskListener.java |   6 +-
 .../service/workflow/BaseWorkflowFormType.java     |  10 +-
 .../service/workflow/BaseWorkflowTaskFormType.java |   8 +-
 .../manager/service/workflow/ProcessName.java      |   8 +-
 .../BusinessAdminApproveForm.java}                 |   6 +-
 .../BusinessResourceWorkflowForm.java}             |   6 +-
 .../business/CreateBusinessWorkflowDefinition.java | 208 ++++++++++++
 .../NewBusinessWorkflowDefinition.java             |  22 +-
 .../NewBusinessWorkflowForm.java                   |   2 +-
 .../listener/BusinessCancelProcessListener.java}   |   6 +-
 .../listener/BusinessCompleteProcessListener.java} |  10 +-
 .../listener/BusinessFailedProcessListener.java}   |  10 +-
 .../listener/BusinessPassTaskListener.java}        |   8 +-
 .../listener/BusinessRejectProcessListener.java}   |   6 +-
 .../listener/InitBusinessInfoListener.java         |   6 +-
 .../StartCreateResourceProcessListener.java        |   8 +-
 .../ConsumptionAdminApproveForm.java}              |  10 +-
 .../NewConsumptionProcessDetailHandler.java        |   2 +-
 .../NewConsumptionWorkflowDefinition.java          |  34 +-
 .../NewConsumptionWorkflowForm.java                |   2 +-
 .../listener/ConsumptionCancelProcessListener.java |  14 +-
 .../ConsumptionCompleteProcessListener.java        | 175 ++++++++++
 .../listener/ConsumptionPassTaskListener.java}     |  22 +-
 .../listener/ConsumptionRejectProcessListener.java |   4 +-
 .../CreateResourceWorkflowDefinition.java          | 176 ----------
 .../ConsumptionCompleteProcessListener.java        | 143 --------
 .../CreateStreamWorkflowDefinition.java}           |  76 ++++-
 .../InitBusinessInfoForStreamListener.java         |   6 +-
 .../StreamCompleteProcessListener.java}            |  10 +-
 .../StreamFailedProcessListener.java}              |  10 +-
 .../src/main/resources/application-dev.properties  |   2 +-
 .../src/main/resources/application-prod.properties |   4 +-
 .../src/main/resources/application-test.properties |   4 +-
 inlong-manager/sql/apache_inlong_manager.sql       | 368 +++++++++++----------
 56 files changed, 1744 insertions(+), 718 deletions(-)

diff --git a/inlong-manager/manager-common/pom.xml b/inlong-manager/manager-common/pom.xml
index 3692efe..6bb2b77 100644
--- a/inlong-manager/manager-common/pom.xml
+++ b/inlong-manager/manager-common/pom.xml
@@ -158,10 +158,22 @@
         <dependency>
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.validation</groupId>
+                    <artifactId>validation-api</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.pulsar</groupId>
             <artifactId>pulsar-client-admin</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.validation</groupId>
+                    <artifactId>validation-api</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <dependency>
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
index b3d8643..6011054 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
@@ -55,6 +55,13 @@ public class BusinessInfo {
     @ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
     private String middlewareType;
 
+    @ApiModelProperty(value = "Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order "
+            + "messages; serial: single partition, low throughput, and orderly messages")
+    private String queueModule = "parallel";
+
+    @ApiModelProperty(value = "The number of partitions of Pulsar Topic, 1-20")
+    private Integer topicPartitionNum = 3;
+
     @ApiModelProperty(value = "MQ resource object, in business",
             notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
     private String mqResourceObj;
@@ -68,13 +75,6 @@ public class BusinessInfo {
     @ApiModelProperty(value = "Pulsar service URL")
     private String pulsarServiceUrl;
 
-    @ApiModelProperty(value = "Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order "
-            + "messages; serial: single partition, low throughput, and orderly messages")
-    private String queueModule = "parallel";
-
-    @ApiModelProperty(value = "The number of partitions of Pulsar Topic, 1-20")
-    private Integer topicPartitionNum = 3;
-
     @ApiModelProperty(value = "Data type name")
     private String schemaName;
 
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java
index fcf17a0..d547550 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java
@@ -31,6 +31,8 @@ public class BusinessEntity implements Serializable {
     private String cnName;
     private String description;
     private String middlewareType;
+    private String queueModule;
+    private Integer topicPartitionNum;
     private String mqResourceObj;
     private Integer dailyRecords;
     private Integer dailyStorage;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
index 95990fa..82ac6fa 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
@@ -18,6 +18,7 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import java.util.List;
+import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.workflow.model.view.CountByKey;
@@ -34,7 +35,8 @@ public interface ConsumptionEntityMapper {
 
     ConsumptionEntity selectByPrimaryKey(Integer id);
 
-    ConsumptionEntity selectConsumptionExists(String groupId, String topic, String consumerGroup);
+    ConsumptionEntity selectConsumptionExists(@Param("groupId") String groupId, @Param("topic") String topic,
+            @Param("consumerGroup") String consumerGroup);
 
     int updateByPrimaryKeySelective(ConsumptionEntity record);
 
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index 55b618b..b46e258 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -38,7 +38,7 @@
         <!-- Database connection URL, username, password -->
         <jdbcConnection driverClass="com.mysql.cj.jdbc.Driver"
                 connectionURL="jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?nullCatalogMeansCurrent=true"
-                userId="xxxxxx" password="xxxxxx">
+                userId="root" password="inlong">
         </jdbcConnection>
 
         <javaTypeResolver>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
index 7af02da..3cd1541 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
@@ -27,6 +27,8 @@
         <result column="cn_name" jdbcType="VARCHAR" property="cnName"/>
         <result column="description" jdbcType="VARCHAR" property="description"/>
         <result column="middleware_type" jdbcType="VARCHAR" property="middlewareType"/>
+        <result column="queue_module" jdbcType="VARCHAR" property="queueModule"/>
+        <result column="topic_partition_num" jdbcType="INTEGER" property="topicPartitionNum"/>
         <result column="mq_resource_obj" jdbcType="VARCHAR" property="mqResourceObj"/>
         <result column="daily_records" jdbcType="INTEGER" property="dailyRecords"/>
         <result column="daily_storage" jdbcType="INTEGER" property="dailyStorage"/>
@@ -50,8 +52,8 @@
     </resultMap>
 
     <sql id="Base_Column_List">
-        id, inlong_group_id, name, cn_name, description, middleware_type, mq_resource_obj,
-        daily_records, daily_storage, peak_records, max_length, schema_name, in_charges, followers,
+        id, inlong_group_id, name, cn_name, description, middleware_type, queue_module, topic_partition_num,
+        mq_resource_obj, daily_records, daily_storage, peak_records, max_length, schema_name, in_charges, followers,
         status, is_deleted, creator, modifier, create_time, modify_time, temp_view
     </sql>
     <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -115,6 +117,7 @@
             parameterType="org.apache.inlong.manager.dao.entity.BusinessEntity">
         insert into business (id, inlong_group_id, name,
                               cn_name, description, middleware_type,
+                              queue_module, topic_partition_num,
                               mq_resource_obj, daily_records, daily_storage,
                               peak_records, max_length, schema_name,
                               in_charges, followers, status,
@@ -122,6 +125,7 @@
                               create_time, modify_time, temp_view)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR},
                 #{cnName,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR}, #{middlewareType,jdbcType=VARCHAR},
+                #{queueModule,jdbcType=VARCHAR}, #{topicPartitionNum,jdbcType=INTEGER},
                 #{mqResourceObj,jdbcType=VARCHAR}, #{dailyRecords,jdbcType=INTEGER}, #{dailyStorage,jdbcType=INTEGER},
                 #{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER}, #{schemaName,jdbcType=VARCHAR},
                 #{inCharges,jdbcType=VARCHAR}, #{followers,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
@@ -150,6 +154,12 @@
             <if test="middlewareType != null">
                 middleware_type,
             </if>
+            <if test="queueModule != null">
+                queue_module,
+            </if>
+            <if test="topicPartitionNum != null">
+                topic_partition_num,
+            </if>
             <if test="mqResourceObj != null">
                 mq_resource_obj,
             </if>
@@ -215,6 +225,12 @@
             <if test="middlewareType != null">
                 #{middlewareType,jdbcType=VARCHAR},
             </if>
+            <if test="queueModule != null">
+                #{queueModule,jdbcType=VARCHAR},
+            </if>
+            <if test="topicPartitionNum != null">
+                #{topicPartitionNum,jdbcType=INTEGER},
+            </if>
             <if test="mqResourceObj != null">
                 #{mqResourceObj,jdbcType=VARCHAR},
             </if>
@@ -280,6 +296,12 @@
             <if test="middlewareType != null">
                 middleware_type = #{middlewareType,jdbcType=VARCHAR},
             </if>
+            <if test="queueModule != null">
+                queue_module = #{queueModule,jdbcType=VARCHAR},
+            </if>
+            <if test="topicPartitionNum != null">
+                topic_partition_num = #{topicPartitionNum,jdbcType=INTEGER},
+            </if>
             <if test="mqResourceObj != null">
                 mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
             </if>
@@ -343,6 +365,12 @@
             <if test="middlewareType != null">
                 middleware_type = #{middlewareType,jdbcType=VARCHAR},
             </if>
+            <if test="queueModule != null">
+                queue_module = #{queueModule,jdbcType=VARCHAR},
+            </if>
+            <if test="topicPartitionNum != null">
+                topic_partition_num = #{topicPartitionNum,jdbcType=INTEGER},
+            </if>
             <if test="mqResourceObj != null">
                 mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
             </if>
@@ -389,26 +417,28 @@
 
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.BusinessEntity">
         update business
-        set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-            name            = #{name,jdbcType=VARCHAR},
-            cn_name         = #{cnName,jdbcType=VARCHAR},
-            description     = #{description,jdbcType=VARCHAR},
-            middleware_type = #{middlewareType,jdbcType=VARCHAR},
-            mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
-            daily_records   = #{dailyRecords,jdbcType=INTEGER},
-            daily_storage   = #{dailyStorage,jdbcType=INTEGER},
-            peak_records    = #{peakRecords,jdbcType=INTEGER},
-            max_length      = #{maxLength,jdbcType=INTEGER},
-            schema_name     = #{schemaName,jdbcType=VARCHAR},
-            in_charges      = #{inCharges,jdbcType=VARCHAR},
-            followers       = #{followers,jdbcType=VARCHAR},
-            status          = #{status,jdbcType=INTEGER},
-            is_deleted      = #{isDeleted,jdbcType=INTEGER},
-            creator         = #{creator,jdbcType=VARCHAR},
-            modifier        = #{modifier,jdbcType=VARCHAR},
-            create_time     = #{createTime,jdbcType=TIMESTAMP},
-            modify_time     = #{modifyTime,jdbcType=TIMESTAMP},
-            temp_view       = #{tempView,jdbcType=LONGVARCHAR}
+        set inlong_group_id     = #{inlongGroupId,jdbcType=VARCHAR},
+            name                = #{name,jdbcType=VARCHAR},
+            cn_name             = #{cnName,jdbcType=VARCHAR},
+            description         = #{description,jdbcType=VARCHAR},
+            middleware_type     = #{middlewareType,jdbcType=VARCHAR},
+            queue_module        = #{queueModule,jdbcType=VARCHAR},
+            topic_partition_num = #{topicPartitionNum,jdbcType=INTEGER},
+            mq_resource_obj     = #{mqResourceObj,jdbcType=VARCHAR},
+            daily_records       = #{dailyRecords,jdbcType=INTEGER},
+            daily_storage       = #{dailyStorage,jdbcType=INTEGER},
+            peak_records        = #{peakRecords,jdbcType=INTEGER},
+            max_length          = #{maxLength,jdbcType=INTEGER},
+            schema_name         = #{schemaName,jdbcType=VARCHAR},
+            in_charges          = #{inCharges,jdbcType=VARCHAR},
+            followers           = #{followers,jdbcType=VARCHAR},
+            status              = #{status,jdbcType=INTEGER},
+            is_deleted          = #{isDeleted,jdbcType=INTEGER},
+            creator             = #{creator,jdbcType=VARCHAR},
+            modifier            = #{modifier,jdbcType=VARCHAR},
+            create_time         = #{createTime,jdbcType=TIMESTAMP},
+            modify_time         = #{modifyTime,jdbcType=TIMESTAMP},
+            temp_view           = #{tempView,jdbcType=LONGVARCHAR}
         where id = #{id,jdbcType=INTEGER}
     </update>
     <update id="updateStatusByIdentifier">
diff --git a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
index 08b2320..99dbe3e 100644
--- a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
@@ -71,27 +71,29 @@ CREATE TABLE `agent_sys_conf`
 DROP TABLE IF EXISTS `business`;
 CREATE TABLE `business`
 (
-    `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
-    `name`            varchar(128)          DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
-    `cn_name`         varchar(256)          DEFAULT NULL COMMENT 'Chinese display name',
-    `description`     varchar(256)          DEFAULT '' COMMENT 'Business Introduction',
-    `middleware_type` varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
-    `mq_resource_obj` varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
-    `daily_records`   int(11)               DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
-    `daily_storage`   int(11)               DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
-    `peak_records`    int(11)               DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
-    `max_length`      int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
-    `schema_name`     varchar(128)          DEFAULT NULL COMMENT 'Data type, associated data_schema table',
-    `in_charges`      varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `followers`       varchar(512)          DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(4)                DEFAULT '21' COMMENT 'Business status',
-    `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`        varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`       text                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+    `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`     varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
+    `name`                varchar(128)          DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
+    `cn_name`             varchar(256)          DEFAULT NULL COMMENT 'Chinese display name',
+    `description`         varchar(256)          DEFAULT '' COMMENT 'Business Introduction',
+    `middleware_type`     varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+    `queue_module`        VARCHAR(20)  NULL     DEFAULT 'parallel' COMMENT 'Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order messages; serial: single partition, low throughput, and orderly messages',
+    `topic_partition_num` INT(4)       NULL     DEFAULT '3' COMMENT 'The number of partitions of Pulsar Topic, 1-20',
+    `mq_resource_obj`     varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
+    `daily_records`       int(11)               DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`       int(11)               DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`        int(11)               DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`          int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+    `schema_name`         varchar(128)          DEFAULT NULL COMMENT 'Data type, associated data_schema table',
+    `in_charges`          varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+    `followers`           varchar(512)          DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
+    `status`              int(4)                DEFAULT '21' COMMENT 'Business status',
+    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`           json                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_business` (`inlong_group_id`, `is_deleted`, `modify_time`)
 );
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index e70b144..041795c 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -152,6 +152,27 @@
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.validation</groupId>
+                    <artifactId>validation-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-admin</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.validation</groupId>
+                    <artifactId>validation-api</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
 </project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
index 3d551ca..081aa5f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.service.core.StorageService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowResult;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index a37a771..316b356 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -59,7 +59,7 @@ import org.apache.inlong.manager.service.core.DataStreamService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowResult;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
 import org.apache.inlong.manager.workflow.model.view.CountByKey;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -344,6 +344,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         entity.setMiddlewareType(middlewareType);
         entity.setTopic(topic);
         entity.setConsumerGroupId(consumerGroup);
+        entity.setConsumerGroupName(consumerGroup);
         entity.setInCharges(bizInfo.getInCharges());
         entity.setFilterEnabled(0);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java
index 80b8759..56b7c6b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java
@@ -38,9 +38,9 @@ import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StorageExtEntityMapper;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newstream.SingleStreamWorkflowDefinition;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.stream.CreateStreamWorkflowDefinition;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -150,7 +150,7 @@ public class StorageBaseOperation {
     /**
      * Asynchronously initiate a single data stream related workflow
      *
-     * @see SingleStreamWorkflowDefinition
+     * @see CreateStreamWorkflowDefinition
      */
     class WorkflowStartRunnable implements Runnable {
 
@@ -170,7 +170,7 @@ public class StorageBaseOperation {
             LOGGER.info("begin start data stream workflow, groupId={}, streamId={}", groupId, streamId);
 
             BusinessInfo businessInfo = CommonBeanUtils.copyProperties(businessEntity, BusinessInfo::new);
-            CreateResourceWorkflowForm form = genBizResourceWorkflowForm(businessInfo, streamId);
+            BusinessResourceWorkflowForm form = genBizResourceWorkflowForm(businessInfo, streamId);
 
             workflowService.start(ProcessName.CREATE_DATASTREAM_RESOURCE, operator, form);
             LOGGER.info("success start data stream workflow, groupId={}, streamId={}", groupId, streamId);
@@ -179,8 +179,8 @@ public class StorageBaseOperation {
         /**
          * Generate [Create Business Resource] form
          */
-        private CreateResourceWorkflowForm genBizResourceWorkflowForm(BusinessInfo businessInfo, String streamId) {
-            CreateResourceWorkflowForm form = new CreateResourceWorkflowForm();
+        private BusinessResourceWorkflowForm genBizResourceWorkflowForm(BusinessInfo businessInfo, String streamId) {
+            BusinessResourceWorkflowForm form = new BusinessResourceWorkflowForm();
             form.setBusinessInfo(businessInfo);
             form.setInlongStreamId(streamId);
             return form;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
index 58ee3ac..9106bca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
@@ -21,7 +21,7 @@ import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
  */
 @Service
 @Slf4j
-public class CreateHiveTableForOneStreamListener implements TaskEventListener {
+public class CreateHiveTableForStreamListener implements TaskEventListener {
 
     @Autowired
     private StorageHiveEntityMapper hiveEntityMapper;
@@ -48,7 +48,7 @@ public class CreateHiveTableForOneStreamListener implements TaskEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
         String streamId = form.getInlongStreamId();
         log.info("begin create hive table for groupId={}, streamId={}", groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
index 8b9d6eb..665d1ee 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
@@ -21,7 +21,7 @@ import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
  */
 @Service
 @Slf4j
-public class CreateHiveTableForAllStreamListener implements TaskEventListener {
+public class CreateHiveTableListener implements TaskEventListener {
 
     @Autowired
     private StorageHiveEntityMapper hiveEntityMapper;
@@ -48,7 +48,7 @@ public class CreateHiveTableForAllStreamListener implements TaskEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
         log.info("begin to create hive table for groupId={}", groupId);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
new file mode 100644
index 0000000..901f1d1
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.core.StorageService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create a subscription group for a single data stream
+ */
+@Slf4j
+@Component
+public class CreatePulsarGroupForStreamTaskListener implements TaskEventListener {
+
+    @Autowired
+    private ClusterBean clusterBean;
+    @Autowired
+    private BusinessService businessService;
+    @Autowired
+    private DataStreamEntityMapper dataStreamMapper;
+    @Autowired
+    private PulsarOptService pulsarOptService;
+    @Autowired
+    private StorageService storageService;
+    @Autowired
+    private ConsumptionService consumptionService;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+        String groupId = form.getInlongGroupId();
+        String streamId = form.getInlongStreamId();
+
+        BusinessInfo bizInfo = businessService.get(groupId);
+        if (bizInfo == null) {
+            log.error("business not found with groupId={}", groupId);
+            throw new WorkflowListenerException("business not found with groupId=" + groupId);
+        }
+
+        DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, streamId);
+        if (streamEntity == null) {
+            log.warn("data stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId);
+            return ListenerResult.success();
+        }
+
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+            // Query data storage info based on groupId and streamId
+            List<String> storageTypeList = storageService.getStorageTypeList(groupId, streamId);
+            if (storageTypeList == null || storageTypeList.size() == 0) {
+                log.warn("storage info is empty for groupId={}, streamId={}, skip to create pulsar group",
+                        groupId, streamId);
+                return ListenerResult.success();
+            }
+
+            PulsarTopicBean topicBean = new PulsarTopicBean();
+            topicBean.setTenant(clusterBean.getDefaultTenant());
+            topicBean.setNamespace(bizInfo.getMqResourceObj());
+            String topic = streamEntity.getMqResourceObj();
+            topicBean.setTopicName(topic);
+            List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+
+            // Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
+            String tenant = clusterBean.getDefaultTenant();
+            String namespace = bizInfo.getMqResourceObj();
+            for (String cluster : pulsarClusters) {
+                String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+                try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceUrl)) {
+                    boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+                    if (!exist) {
+                        String fullTopic = tenant + "/" + namespace + "/" + topic;
+                        log.error("topic={} not exists in {}", fullTopic, pulsarAdmin.getServiceUrl());
+                        throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
+                    }
+
+                    // Consumer naming rules: sortAppName_topicName_consumer_group
+                    String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+                    pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+
+                    // Insert the consumption data into the consumption table
+                    consumptionService.saveSortConsumption(bizInfo, topic, subscription);
+                }
+            }
+        } catch (Exception e) {
+            log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
+            throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
+        }
+
+        log.info("finish to create single pulsar subscription for groupId={}, streamId={}", groupId, streamId);
+        return ListenerResult.success();
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
new file mode 100644
index 0000000..2fa3fe0
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create default subscription group for Pulsar
+ */
+@Component
+@Slf4j
+public class CreatePulsarGroupTaskListener implements TaskEventListener {
+
+    @Autowired
+    private ClusterBean clusterBean;
+    @Autowired
+    private BusinessService businessService;
+    @Autowired
+    private ConsumptionService consumptionService;
+    @Autowired
+    private DataStreamEntityMapper dataStreamMapper;
+    @Autowired
+    private PulsarOptService pulsarOptService;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+
+        String groupId = form.getInlongGroupId();
+        BusinessInfo bizInfo = businessService.get(groupId);
+        if (bizInfo == null) {
+            log.error("business not found with groupId={}", groupId);
+            throw new WorkflowListenerException("business not found with groupId=" + groupId);
+        }
+
+        // For Pulsar, each Stream corresponds to a Topic
+        List<DataStreamEntity> streamEntities = dataStreamMapper.selectByGroupId(groupId);
+        if (streamEntities == null || streamEntities.isEmpty()) {
+            log.warn("data stream is empty for groupId={}, skip to create pulsar subscription", groupId);
+            return ListenerResult.success();
+        }
+
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+            String tenant = clusterBean.getDefaultTenant();
+            String namespace = bizInfo.getMqResourceObj();
+
+            for (DataStreamEntity streamEntity : streamEntities) {
+                PulsarTopicBean topicBean = new PulsarTopicBean();
+                topicBean.setTenant(tenant);
+                topicBean.setNamespace(namespace);
+                String topic = streamEntity.getMqResourceObj();
+                topicBean.setTopicName(topic);
+                List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+
+                // Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
+                for (String cluster : pulsarClusters) {
+                    String url = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+                    try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(url)) {
+                        boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+
+                        if (!exist) {
+                            String topicFull = tenant + "/" + namespace + "/" + topic;
+                            log.error("topic={} not exists in {}", topicFull, url);
+                            throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + url);
+                        }
+
+                        // Consumer naming rules: sortAppName_topicName_consumer_group
+                        String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+                        pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+
+                        // Insert the consumption data into the consumption table
+                        consumptionService.saveSortConsumption(bizInfo, topic, subscription);
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("create pulsar subscription error for groupId={}", groupId);
+            throw new WorkflowListenerException("create pulsar subscription error: " + e.getMessage());
+        }
+
+        log.info("success to create pulsar subscription for groupId={}", groupId);
+        return ListenerResult.success();
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
new file mode 100644
index 0000000..2ee9dfa
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessPulsarEntityMapper;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create Pulsar tenant, namespace and topic
+ */
+@Slf4j
+@Component()
+public class CreatePulsarResourceTaskListener implements TaskEventListener {
+
+    @Autowired
+    PulsarOptService pulsarOptService;
+    @Autowired
+    private ClusterBean clusterBean;
+    @Autowired
+    private BusinessService businessService;
+    @Autowired
+    private BusinessPulsarEntityMapper businessPulsarMapper;
+    @Autowired
+    private DataStreamEntityMapper dataStreamMapper;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+        String groupId = form.getInlongGroupId();
+        log.info("begin to create pulsar resource for groupId={}", groupId);
+
+        BusinessInfo businessInfo = businessService.get(groupId);
+        if (businessInfo == null) {
+            throw new WorkflowListenerException("business or pulsar cluster not found for groupId=" + groupId);
+        }
+
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+            List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+            for (String cluster : pulsarClusters) {
+                String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+                this.createPulsarProcess(businessInfo, serviceUrl);
+            }
+        } catch (Exception e) {
+            log.error("create pulsar resource error for groupId={}", groupId, e);
+            throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId);
+        }
+
+        log.info("success to create pulsar resource for groupId={}", groupId);
+        return ListenerResult.success();
+    }
+
+    /**
+     * Create Pulsar tenant, namespace and topic
+     */
+    private void createPulsarProcess(BusinessInfo businessInfo, String serviceHttpUrl) throws Exception {
+        String groupId = businessInfo.getInlongGroupId();
+        log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, serviceHttpUrl);
+
+        String namespace = businessInfo.getMqResourceObj();
+        Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty for groupId=" + groupId);
+        String queueModule = businessInfo.getQueueModule();
+        Preconditions.checkNotNull(queueModule, "queue module cannot be empty for groupId=" + groupId);
+
+        String tenant = clusterBean.getDefaultTenant();
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceHttpUrl)) {
+            // create pulsar tenant
+            pulsarOptService.createTenant(pulsarAdmin, tenant);
+
+            // create pulsar namespace
+            BusinessPulsarEntity entity = businessPulsarMapper.selectByGroupId(groupId);
+            pulsarOptService.createNamespace(pulsarAdmin, entity, tenant, namespace);
+
+            // create pulsar topic
+            Integer partitionNum = businessInfo.getTopicPartitionNum();
+            List<DataStreamTopicVO> streamTopicList = dataStreamMapper.selectTopicList(groupId);
+            PulsarTopicBean topicBean = PulsarTopicBean.builder()
+                    .tenant(tenant).namespace(namespace).numPartitions(partitionNum).queueModule(queueModule).build();
+
+            for (DataStreamTopicVO topicVO : streamTopicList) {
+                topicBean.setTopicName(topicVO.getMqResourceObj());
+                pulsarOptService.createTopic(pulsarAdmin, topicBean);
+            }
+        }
+        log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId, serviceHttpUrl);
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
new file mode 100644
index 0000000..1211c86
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create task listener for Pulsar Topic
+ */
+@Slf4j
+@Component
+public class CreatePulsarTopicForStreamTaskListener implements TaskEventListener {
+
+    @Autowired
+    private ClusterBean clusterBean;
+    @Autowired
+    private PulsarOptService pulsarOptService;
+    @Autowired
+    private BusinessService businessService;
+    @Autowired
+    private DataStreamEntityMapper dataStreamMapper;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+        String groupId = form.getInlongGroupId();
+        String streamId = form.getInlongStreamId();
+
+        BusinessInfo businessInfo = businessService.get(groupId);
+        DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, streamId);
+        if (businessInfo == null || streamEntity == null) {
+            throw new WorkflowListenerException("business or data stream not found with groupId=" + groupId
+                    + ", streamId=" + streamId);
+        }
+
+        log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+            List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+            for (String cluster : pulsarClusters) {
+                String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+                String pulsarTopic = streamEntity.getMqResourceObj();
+                this.createTopicOpt(businessInfo, pulsarTopic, serviceUrl);
+            }
+        } catch (Exception e) {
+            log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
+            throw new WorkflowListenerException(
+                    "create pulsar topic error for groupId=" + groupId + ", streamId=" + streamId);
+        }
+
+        log.info("success to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
+        return ListenerResult.success();
+    }
+
+    private void createTopicOpt(BusinessInfo bizInfo, String pulsarTopic, String serviceHttpUrl) throws Exception {
+        Integer partitionNum = bizInfo.getTopicPartitionNum();
+        int partition = 0;
+        if (partitionNum != null && partitionNum > 0) {
+            partition = partitionNum;
+        }
+
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceHttpUrl)) {
+            PulsarTopicBean topicBean = PulsarTopicBean.builder()
+                    .tenant(clusterBean.getDefaultTenant())
+                    .namespace(bizInfo.getMqResourceObj())
+                    .topicName(pulsarTopic)
+                    .numPartitions(partition)
+                    .queueModule(bizInfo.getQueueModule())
+                    .build();
+            pulsarOptService.createTopic(pulsarAdmin, topicBean);
+        }
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumerGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
similarity index 94%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumerGroupTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
index 580c249..3a83099 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumerGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.G
 import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
 import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
 import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -38,7 +38,7 @@ import org.springframework.stereotype.Component;
 
 @Component
 @Slf4j
-public class CreateTubeConsumerGroupTaskListener implements TaskEventListener {
+public class CreateTubeGroupTaskListener implements TaskEventListener {
 
     @Autowired
     BusinessService businessService;
@@ -62,7 +62,7 @@ public class CreateTubeConsumerGroupTaskListener implements TaskEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
 
         log.info("try to create consumer group for groupId {}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
index 01d73c6..2bd275c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
 import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -50,7 +50,7 @@ public class CreateTubeTopicTaskListener implements TaskEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
 
         log.info("begin create tube topic for groupId={}", form.getInlongGroupId());
         String groupId = form.getInlongGroupId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptService.java
new file mode 100644
index 0000000..899cf99
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+
+/**
+ * Interface of Pulsar operation
+ */
+public interface PulsarOptService {
+
+    void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException;
+
+    void createNamespace(PulsarAdmin pulsarAdmin, BusinessPulsarEntity pulsarEntity, String tenant,
+            String namespace) throws PulsarAdminException;
+
+    void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException;
+
+    void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
+            throws PulsarAdminException;
+
+    void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+            List<String> topics) throws PulsarAdminException;
+
+    boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
+            throws PulsarAdminException;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptServiceImpl.java
new file mode 100644
index 0000000..488a503
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptServiceImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import com.google.common.collect.Sets;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.conversion.ConversionHandle;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Operation interface of Pulsar
+ */
+@Service
+@Slf4j
+public class PulsarOptServiceImpl implements PulsarOptService {
+
+    @Autowired
+    private ConversionHandle conversionHandle;
+
+    @Override
+    public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
+        log.info("begin to create tenant={}", tenant);
+
+        Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
+        try {
+            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+            boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
+            if (exists) {
+                log.warn("pulsar tenant={} already exists, skip to create", tenant);
+                return;
+            }
+            TenantInfoImpl tenantInfo = new TenantInfoImpl();
+            tenantInfo.setAllowedClusters(Sets.newHashSet(clusters));
+            tenantInfo.setAdminRoles(Sets.newHashSet());
+            pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
+            log.info("success to create pulsar tenant={}", tenant);
+        } catch (PulsarAdminException e) {
+            log.error("create pulsar tenant={} failed", tenant, e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void createNamespace(PulsarAdmin pulsarAdmin, BusinessPulsarEntity pulsarEntity,
+            String tenant, String namespace) throws PulsarAdminException {
+        Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
+        Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
+
+        String namespaceName = tenant + "/" + namespace;
+        log.info("begin to create namespace={}", namespaceName);
+
+        try {
+            // Check whether the namespace exists, and create it if it does not exist
+            boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
+            if (isExists) {
+                log.warn("namespace={} already exists, skip to create", namespaceName);
+                return;
+            }
+
+            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+            Namespaces namespaces = pulsarAdmin.namespaces();
+            namespaces.createNamespace(namespaceName, Sets.newHashSet(clusters));
+
+            // Configure message TTL
+            Integer ttl = pulsarEntity.getTtl();
+            if (ttl > 0) {
+                namespaces.setNamespaceMessageTTL(namespaceName, conversionHandle.handleConversion(ttl,
+                        pulsarEntity.getTtlUnit().toLowerCase() + "_seconds"));
+            }
+
+            // retentionTimeInMinutes retentionSizeInMB
+            Integer retentionTime = pulsarEntity.getRetentionTime();
+            if (retentionTime > 0) {
+                retentionTime = conversionHandle.handleConversion(retentionTime,
+                        pulsarEntity.getRetentionTimeUnit().toLowerCase() + "_minutes");
+            }
+            Integer retentionSize = pulsarEntity.getRetentionSize();
+            if (retentionSize > 0) {
+                retentionSize = conversionHandle.handleConversion(retentionSize,
+                        pulsarEntity.getRetentionSizeUnit().toLowerCase() + "_mb");
+            }
+
+            // Configure retention policies
+            RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime, retentionSize);
+            namespaces.setRetention(namespaceName, retentionPolicies);
+
+            // Configure persistence policies
+            PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarEntity.getEnsemble(),
+                    pulsarEntity.getWriteQuorum(), pulsarEntity.getAckQuorum(), 0);
+            namespaces.setPersistence(namespaceName, persistencePolicies);
+
+            log.info("success to create namespace={}", tenant);
+        } catch (PulsarAdminException e) {
+            log.error("create namespace={} error", tenant, e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
+        Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
+
+        String tenant = topicBean.getTenant();
+        String namespace = topicBean.getNamespace();
+        String topic = topicBean.getTopicName();
+        String topicFullName = tenant + "/" + namespace + "/" + topic;
+
+        // Topic will be returned if it exists, and created if it does not exist
+        if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+            log.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
+            return;
+        }
+
+        try {
+            String queueModule = topicBean.getQueueModule();
+            // create non-partition topic
+            if (BizConstant.PULSAR_TOPIC_TYPE_SERIAL.equalsIgnoreCase(queueModule)) {
+                pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
+            } else { // create partition topic
+                List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+                // The number of brokers as the default value of topic partition
+                List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
+                Integer numPartitions = brokers.size();
+                if (topicBean.getNumPartitions() > 0) {
+                    numPartitions = topicBean.getNumPartitions();
+                }
+                pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
+            }
+
+            log.info("success to create topic={}", topicFullName);
+        } catch (Exception e) {
+            log.error("create topic={} failed", topicFullName, e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
+            throws PulsarAdminException {
+        Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
+        Preconditions.checkNotNull(subscription, "subscription cannot be empty during creating subscription");
+
+        String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
+        log.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
+
+        try {
+            boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
+            if (!isExists) {
+                pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
+                log.info("success to create subscription={}", subscription);
+            } else {
+                log.warn("pulsar subscription={} already exists, skip to create", subscription);
+            }
+        } catch (Exception e) {
+            log.error("create pulsar subscription={} failed", subscription, e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+            List<String> topicList) throws PulsarAdminException {
+        for (String topic : topicList) {
+            topicBean.setTopicName(topic);
+            this.createSubscription(pulsarAdmin, topicBean, subscription);
+        }
+        log.info("success to create subscription={} for multiple topics={}", subscription, topicList);
+    }
+
+    /**
+     * Check if Pulsar tenant exists
+     */
+    private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
+        List<String> tenantList = pulsarAdmin.tenants().getTenants();
+        return tenantList.contains(tenant);
+    }
+
+    /**
+     * Check whether the specified Namespace exists under the specified Tenant
+     */
+    private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
+            throws PulsarAdminException {
+        List<String> namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
+        return namespaceList.contains(tenant + "/" + namespace);
+    }
+
+    /**
+     * Verify whether the specified Topic exists under the specified Tenant/Namespace
+     *
+     * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
+     *         Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
+     */
+    @Override
+    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
+            throws PulsarAdminException {
+        if (StringUtils.isBlank(topic)) {
+            return true;
+        }
+
+        // persistent://tenant/namespace/topic
+        List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+        for (String t : topicList) {
+            t = t.substring(t.lastIndexOf("/") + 1); // Cannot contain /
+            if (topic.equals(t)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
+        try {
+            List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
+            return subscriptionList.contains(subscription);
+        } catch (PulsarAdminException e) {
+            log.error("check if the topic={} is exists error,", topic, e);
+            return false;
+        }
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/util/PulsarUtils.java
new file mode 100644
index 0000000..bf54a4d
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/util/PulsarUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq.util;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * Pulsar connection utils
+ */
+@Slf4j
+public class PulsarUtils {
+
+    private PulsarUtils() {
+    }
+
+    /**
+     * Obtain the PulsarAdmin client according to the service URL, and it must be closed after use
+     */
+    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws PulsarClientException {
+        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
+    }
+
+    public static List<String> getPulsarClusters(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
+        return pulsarAdmin.clusters().getClusters();
+    }
+
+    public static String getServiceUrl(PulsarAdmin pulsarAdmin, String pulsarCluster) throws PulsarAdminException {
+        return pulsarAdmin.clusters().getCluster(pulsarCluster).getServiceUrl();
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index 9f037ec..dd49261 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -37,7 +37,7 @@ import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -100,7 +100,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
             log.debug("begin push hive config to sort, context={}", context);
         }
 
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         BusinessInfo businessInfo = form.getBusinessInfo();
         String groupId = businessInfo.getInlongGroupId();
 
@@ -262,7 +262,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
             String pulsarTopic = info.getMqResourceObj();
             // Full name of Topic in Pulsar
             String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
-            String adminUrl = clusterBean.getPulsarServiceUrl();
+            String adminUrl = clusterBean.getPulsarAdminUrl();
             String serviceUrl = clusterBean.getPulsarServiceUrl();
             String consumerGroup = clusterBean.getAppName() + "_" + pulsarTopic + "_consumer_group";
             sourceInfo = new PulsarSourceInfo(adminUrl, serviceUrl, fullTopicName, consumerGroup,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java
index f9c8f74..d5649b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java
@@ -21,9 +21,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import lombok.Data;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
 import org.apache.inlong.manager.workflow.model.definition.ProcessForm;
 
 /**
@@ -35,8 +35,8 @@ import org.apache.inlong.manager.workflow.model.definition.ProcessForm;
 @JsonSubTypes({
         @JsonSubTypes.Type(value = NewBusinessWorkflowForm.class, name = NewBusinessWorkflowForm.FORM_NAME),
         @JsonSubTypes.Type(value = NewConsumptionWorkflowForm.class, name = NewConsumptionWorkflowForm.FORM_NAME),
-        @JsonSubTypes.Type(value = CreateResourceWorkflowForm.class,
-                name = CreateResourceWorkflowForm.FORM_NAME),
+        @JsonSubTypes.Type(value = BusinessResourceWorkflowForm.class,
+                name = BusinessResourceWorkflowForm.FORM_NAME),
 })
 public abstract class BaseWorkflowFormType implements ProcessForm {
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java
index 5803802..b844057 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java
@@ -21,8 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import lombok.Data;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessApproveForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionApproveForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessAdminApproveForm;
+import org.apache.inlong.manager.service.workflow.consumption.ConsumptionAdminApproveForm;
 import org.apache.inlong.manager.workflow.model.definition.TaskForm;
 
 /**
@@ -31,8 +31,8 @@ import org.apache.inlong.manager.workflow.model.definition.TaskForm;
 @Data
 @JsonTypeInfo(use = Id.NAME, property = "formName")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = NewBusinessApproveForm.class, name = NewBusinessApproveForm.FORM_NAME),
-        @JsonSubTypes.Type(value = NewConsumptionApproveForm.class, name = NewConsumptionApproveForm.FORM_NAME),
+        @JsonSubTypes.Type(value = BusinessAdminApproveForm.class, name = BusinessAdminApproveForm.FORM_NAME),
+        @JsonSubTypes.Type(value = ConsumptionAdminApproveForm.class, name = ConsumptionAdminApproveForm.FORM_NAME),
 })
 public abstract class BaseWorkflowTaskFormType implements TaskForm {
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
index 6c45a79..1d695b6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
@@ -25,22 +25,22 @@ public enum ProcessName {
     /**
      * New business access application process
      */
-    NEW_BUSINESS_WORKFLOW("New Business Access"),
+    NEW_BUSINESS_WORKFLOW("New-Business-Access"),
 
     /**
      * New data consumption application process
      */
-    NEW_CONSUMPTION_WORKFLOW("New Data Consumption"),
+    NEW_CONSUMPTION_WORKFLOW("New-Data-Consumption"),
 
     /**
      * New business resource creation
      */
-    CREATE_BUSINESS_RESOURCE("New Business Access Resource"),
+    CREATE_BUSINESS_RESOURCE("Business-Access-Resource"),
 
     /**
      * Single data stream resource creation
      */
-    CREATE_DATASTREAM_RESOURCE("Single Data Stream Resource");
+    CREATE_DATASTREAM_RESOURCE("Data-Stream-Resource");
 
     private final String displayName;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessAdminApproveForm.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessAdminApproveForm.java
index f9c68b0..380294e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessAdminApproveForm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
 
 import io.swagger.annotations.ApiModelProperty;
 import java.util.List;
@@ -32,9 +32,9 @@ import org.apache.inlong.manager.workflow.exception.FormValidateException;
  */
 @Data
 @EqualsAndHashCode(callSuper = false)
-public class NewBusinessApproveForm extends BaseWorkflowTaskFormType {
+public class BusinessAdminApproveForm extends BaseWorkflowTaskFormType {
 
-    public static final String FORM_NAME = "NewBusinessApproveForm";
+    public static final String FORM_NAME = "BusinessAdminApproveForm";
 
     @ApiModelProperty(value = "Access business information", required = true)
     private BusinessApproveInfo businessApproveInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessResourceWorkflowForm.java
similarity index 91%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessResourceWorkflowForm.java
index 3452293..ac1d844 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessResourceWorkflowForm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
 
 import com.google.common.collect.Maps;
 
@@ -33,9 +33,9 @@ import lombok.EqualsAndHashCode;
  */
 @Data
 @EqualsAndHashCode(callSuper = false)
-public class CreateResourceWorkflowForm extends BaseWorkflowFormType {
+public class BusinessResourceWorkflowForm extends BaseWorkflowFormType {
 
-    public static final String FORM_NAME = "CreateResourceWorkflowForm";
+    public static final String FORM_NAME = "BusinessResourceWorkflowForm";
 
     private BusinessInfo businessInfo;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinition.java
new file mode 100644
index 0000000..0f01b7e
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinition.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow.business;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.StorageService;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
+import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
+import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessCompleteProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessFailedProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.InitBusinessInfoListener;
+import org.apache.inlong.manager.workflow.model.definition.EndEvent;
+import org.apache.inlong.manager.workflow.model.definition.Process;
+import org.apache.inlong.manager.workflow.model.definition.ServiceTask;
+import org.apache.inlong.manager.workflow.model.definition.StartEvent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create workflow definitions for business resources
+ */
+@Slf4j
+@Component
+public class CreateBusinessWorkflowDefinition implements WorkflowDefinition {
+
+    @Autowired
+    private InitBusinessInfoListener initBusinessInfoListener;
+    @Autowired
+    private BusinessCompleteProcessListener businessCompleteProcessListener;
+    @Autowired
+    private BusinessFailedProcessListener businessFailedProcessListener;
+    @Autowired
+    private CreateTubeTopicTaskListener createTubeTopicTaskListener;
+    @Autowired
+    private CreateTubeGroupTaskListener createTubeGroupTaskListener;
+    @Autowired
+    private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
+    @Autowired
+    private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
+
+    @Autowired
+    private CreateHiveTableListener createHiveTableListener;
+    @Autowired
+    private PushHiveConfigTaskListener pushHiveConfigTaskListener;
+    @Autowired
+    private StorageService storageService;
+    @Autowired
+    private DataStreamEntityMapper streamMapper;
+
+    @Override
+    public Process defineProcess() {
+
+        // Configuration process
+        Process process = new Process();
+        process.addListener(initBusinessInfoListener);
+        process.addListener(businessCompleteProcessListener);
+        process.addListener(businessFailedProcessListener);
+
+        process.setType("Business Resource Creation");
+        process.setName(getProcessName().name());
+        process.setDisplayName(getProcessName().getDisplayName());
+        process.setFormClass(BusinessResourceWorkflowForm.class);
+        process.setVersion(1);
+        process.setHidden(true);
+
+        // Start node
+        StartEvent startEvent = new StartEvent();
+        process.setStartEvent(startEvent);
+
+        // End node
+        EndEvent endEvent = new EndEvent();
+        process.setEndEvent(endEvent);
+
+        ServiceTask createTubeTopicTask = new ServiceTask();
+        createTubeTopicTask.setSkipResolver(c -> {
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+            BusinessInfo businessInfo = form.getBusinessInfo();
+            if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
+                return false;
+            }
+            log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
+                    businessInfo.getMiddlewareType(), form.getInlongGroupId());
+            return true;
+        });
+        createTubeTopicTask.setName("createTubeTopic");
+        createTubeTopicTask.setDisplayName("Business-CreateTubeTopic");
+        createTubeTopicTask.addListener(createTubeTopicTaskListener);
+        process.addTask(createTubeTopicTask);
+
+        ServiceTask createTubeConsumerTask = new ServiceTask();
+        createTubeConsumerTask.setSkipResolver(c -> {
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+            String middlewareType = form.getBusinessInfo().getMiddlewareType();
+            if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+                return false;
+            }
+            log.warn("no need to create tube resource for groupId={}, as the middleware type is {}",
+                    form.getInlongGroupId(), middlewareType);
+            return true;
+        });
+        createTubeConsumerTask.setName("createConsumerGroup");
+        createTubeConsumerTask.setDisplayName("Business-CreateTubeConsumer");
+        createTubeConsumerTask.addListener(createTubeGroupTaskListener);
+        process.addTask(createTubeConsumerTask);
+
+        ServiceTask createPulsarResourceTask = new ServiceTask();
+        createPulsarResourceTask.setSkipResolver(c -> {
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+            String middlewareType = form.getBusinessInfo().getMiddlewareType();
+            if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+                return false;
+            }
+            log.warn("no need to create pulsar resource for groupId={}, as the middlewareType={}",
+                    form.getInlongGroupId(), middlewareType);
+            return true;
+        });
+        createPulsarResourceTask.setName("createPulsarResource");
+        createPulsarResourceTask.setDisplayName("Business-CreatePulsarResource");
+        createPulsarResourceTask.addListener(createPulsarResourceTaskListener);
+        process.addTask(createPulsarResourceTask);
+
+        ServiceTask createPulsarSubscriptionTask = new ServiceTask();
+        createPulsarSubscriptionTask.setSkipResolver(c -> {
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+            String middlewareType = form.getBusinessInfo().getMiddlewareType();
+            if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+                return false;
+            }
+            log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
+                    form.getInlongGroupId(), middlewareType);
+            return true;
+        });
+        createPulsarSubscriptionTask.setName("createPulsarSubscriptionTask");
+        createPulsarSubscriptionTask.setDisplayName("Business-CreatePulsarSubscription");
+        createPulsarSubscriptionTask.addListener(createPulsarGroupTaskListener);
+        process.addTask(createPulsarSubscriptionTask);
+
+        ServiceTask createHiveTablesTask = new ServiceTask();
+        createHiveTablesTask.setSkipResolver(c -> {
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+            String groupId = form.getInlongGroupId();
+            List<String> dsForHive = storageService.filterStreamIdByStorageType(groupId, BizConstant.STORAGE_HIVE,
+                    streamMapper.selectByGroupId(groupId)
+                            .stream()
+                            .map(DataStreamEntity::getInlongStreamId)
+                            .collect(Collectors.toList()));
+            if (CollectionUtils.isEmpty(dsForHive)) {
+                log.warn("groupId={} streamId={} does not have storage, skip to create hive table ",
+                        groupId, form.getInlongStreamId());
+                return true;
+            }
+            return false;
+        });
+        createHiveTablesTask.setName("createHiveTableTask");
+        createHiveTablesTask.setDisplayName("Business-CreateHiveTable");
+        createHiveTablesTask.addListener(createHiveTableListener);
+        process.addTask(createHiveTablesTask);
+
+        ServiceTask pushSortConfig = new ServiceTask();
+        pushSortConfig.setName("pushSortConfig");
+        pushSortConfig.setDisplayName("Business-PushSortConfig");
+        pushSortConfig.addListener(pushHiveConfigTaskListener);
+        process.addTask(pushSortConfig);
+
+        startEvent.addNext(createTubeTopicTask);
+        createTubeTopicTask.addNext(createTubeConsumerTask);
+        createTubeConsumerTask.addNext(createPulsarResourceTask);
+        createPulsarResourceTask.addNext(createPulsarSubscriptionTask);
+        createPulsarSubscriptionTask.addNext(createHiveTablesTask);
+        createHiveTablesTask.addNext(pushSortConfig);
+        pushSortConfig.addNext(endEvent);
+
+        return process;
+    }
+
+    @Override
+    public ProcessName getProcessName() {
+        return ProcessName.CREATE_BUSINESS_RESOURCE;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowDefinition.java
similarity index 81%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowDefinition.java
index 69372f6..7ad4b17 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowDefinition.java
@@ -15,17 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
 
 import java.util.List;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveCancelProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApprovePassTaskListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveRejectProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.StartCreateResourceProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessPassTaskListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessRejectProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.StartCreateResourceProcessListener;
 import org.apache.inlong.manager.workflow.model.definition.EndEvent;
 import org.apache.inlong.manager.workflow.model.definition.Process;
 import org.apache.inlong.manager.workflow.model.definition.StartEvent;
@@ -40,11 +40,11 @@ import org.springframework.stereotype.Component;
 public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
-    private ApprovePassTaskListener approvePassTaskListener;
+    private BusinessPassTaskListener businessPassTaskListener;
     @Autowired
-    private ApproveCancelProcessListener approveCancelProcessListener;
+    private BusinessCancelProcessListener businessCancelProcessListener;
     @Autowired
-    private ApproveRejectProcessListener approveRejectProcessListener;
+    private BusinessRejectProcessListener approveRejectProcessListener;
     @Autowired
     private StartCreateResourceProcessListener startCreateResourceProcessListener;
     @Autowired
@@ -61,7 +61,7 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
         process.setVersion(1);
 
         // Set up the listener
-        process.addListener(approveCancelProcessListener);
+        process.addListener(businessCancelProcessListener);
         process.addListener(approveRejectProcessListener);
         // Initiate the process of creating business resources,
         // and set the business status to [Configuration Successful]/[Configuration Failed] according to its completion
@@ -79,9 +79,9 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
         UserTask adminUserTask = new UserTask();
         adminUserTask.setName("ut_admin");
         adminUserTask.setDisplayName("System Administrator");
-        adminUserTask.setFormClass(NewBusinessApproveForm.class);
+        adminUserTask.setFormClass(BusinessAdminApproveForm.class);
         adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
-        adminUserTask.addListener(approvePassTaskListener);
+        adminUserTask.addListener(businessPassTaskListener);
         process.addTask(adminUserTask);
 
         // Configuration order relationship
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowForm.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowForm.java
index 5663106..fe475e4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowForm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
 
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCancelProcessListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCancelProcessListener.java
index b13fe67..21634fd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCancelProcessListener.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class ApproveCancelProcessListener implements ProcessEventListener {
+public class BusinessCancelProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCompleteProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCompleteProcessListener.java
index 9435468..ad56542 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCompleteProcessListener.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class CreateResourceCompleteProcessListener implements ProcessEventListener {
+public class BusinessCompleteProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
@@ -50,11 +50,11 @@ public class CreateResourceCompleteProcessListener implements ProcessEventListen
     /**
      * After the process of creating business resources is completed, modify the status of business and all data stream
      * belong to this business to [Configuration Successful] [Configuration Failed]
-     * <p/>{@link CreateResourceFailedProcessListener#listen}
+     * <p/>{@link BusinessFailedProcessListener#listen}
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
 
         String groupId = form.getInlongGroupId();
         String username = context.getApplicant();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessFailedProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessFailedProcessListener.java
index 5acdbfd..50e2a1d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessFailedProcessListener.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class CreateResourceFailedProcessListener implements ProcessEventListener {
+public class BusinessFailedProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
@@ -50,11 +50,11 @@ public class CreateResourceFailedProcessListener implements ProcessEventListener
     /**
      * The process of creating business resources is abnormal, and the business status is changed to
      * [Configuration failed] [Configuration successful]
-     * <p/>{@link CreateResourceCompleteProcessListener#listen}
+     * <p/>{@link BusinessCompleteProcessListener#listen}
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
         String username = context.getApplicant();
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessPassTaskListener.java
similarity index 89%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessPassTaskListener.java
index 4bef36a..a23c574 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessPassTaskListener.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
 
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessApproveForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessAdminApproveForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class ApprovePassTaskListener implements TaskEventListener {
+public class BusinessPassTaskListener implements TaskEventListener {
 
     @Autowired
     private BusinessService businessService;
@@ -52,7 +52,7 @@ public class ApprovePassTaskListener implements TaskEventListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         // Save the data format selected at the time of approval and the cluster information of the data stream
-        NewBusinessApproveForm approveForm = (NewBusinessApproveForm) context.getActionContext().getForm();
+        BusinessAdminApproveForm approveForm = (BusinessAdminApproveForm) context.getActionContext().getForm();
 
         // Save the business information after approval
         BusinessApproveInfo approveInfo = approveForm.getBusinessApproveInfo();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessRejectProcessListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessRejectProcessListener.java
index 518efc3..ead0d56 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessRejectProcessListener.java
@@ -15,12 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class ApproveRejectProcessListener implements ProcessEventListener {
+public class BusinessRejectProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/InitBusinessInfoListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/InitBusinessInfoListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/InitBusinessInfoListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/InitBusinessInfoListener.java
index c43a07b..1827b43 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/InitBusinessInfoListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/InitBusinessInfoListener.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
 
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -46,7 +46,7 @@ public class InitBusinessInfoListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         BusinessInfo businessInfo = businessService.get(context.getProcessForm().getInlongGroupId());
         if (businessInfo != null) {
             form.setBusinessInfo(businessInfo);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/StartCreateResourceProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/StartCreateResourceProcessListener.java
similarity index 88%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/StartCreateResourceProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/StartCreateResourceProcessListener.java
index 3e71e3e..03ffbc8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/StartCreateResourceProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/StartCreateResourceProcessListener.java
@@ -15,14 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -56,7 +56,7 @@ public class StartCreateResourceProcessListener implements ProcessEventListener
         NewBusinessWorkflowForm workflowForm = (NewBusinessWorkflowForm) context.getProcessForm();
 
         String groupId = workflowForm.getInlongGroupId();
-        CreateResourceWorkflowForm resourceWorkflowForm = new CreateResourceWorkflowForm();
+        BusinessResourceWorkflowForm resourceWorkflowForm = new BusinessResourceWorkflowForm();
         resourceWorkflowForm.setBusinessInfo(businessService.get(groupId));
         String username = context.getApplicant();
         workflowService.start(ProcessName.CREATE_BUSINESS_RESOURCE, username, resourceWorkflowForm);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionApproveForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ConsumptionAdminApproveForm.java
similarity index 84%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionApproveForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ConsumptionAdminApproveForm.java
index 71be5df..ba7824c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionApproveForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ConsumptionAdminApproveForm.java
@@ -15,23 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.workflow.BaseWorkflowTaskFormType;
 import org.apache.inlong.manager.workflow.exception.FormValidateException;
-import org.apache.inlong.manager.common.util.Preconditions;
 
 /**
  * New consumption approve form for admin
  */
 @Data
+@EqualsAndHashCode(callSuper = true)
 @ApiModel("New data consumption-system administrator form")
-public class NewConsumptionApproveForm extends BaseWorkflowTaskFormType {
+public class ConsumptionAdminApproveForm extends BaseWorkflowTaskFormType {
 
-    public static final String FORM_NAME = "NewConsumptionApproveForm";
+    public static final String FORM_NAME = "ConsumptionAdminApproveForm";
 
     @ApiModelProperty("Consumer group ID")
     private String consumerGroupId;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionProcessDetailHandler.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionProcessDetailHandler.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
index f33d5b9..7c108fc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionProcessDetailHandler.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
 
 import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
 import org.apache.inlong.manager.workflow.model.definition.Process;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
index c5315b6..16d527f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
@@ -15,23 +15,21 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
-import java.util.Optional;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionApproveTaskListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCancelProcessListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionRejectProcessListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCompleteProcessListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionPassTaskListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionRejectProcessListener;
 import org.apache.inlong.manager.workflow.model.WorkflowContext;
 import org.apache.inlong.manager.workflow.model.definition.EndEvent;
 import org.apache.inlong.manager.workflow.model.definition.Process;
@@ -53,7 +51,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     private ConsumptionCompleteProcessListener consumptionCompleteProcessListener;
 
     @Autowired
-    private ConsumptionApproveTaskListener consumptionApproveTaskListener;
+    private ConsumptionPassTaskListener consumptionPassTaskListener;
 
     @Autowired
     private ConsumptionRejectProcessListener consumptionRejectProcessListener;
@@ -101,9 +99,9 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
         UserTask adminUserTask = new UserTask();
         adminUserTask.setName(UT_ADMINT_NAME);
         adminUserTask.setDisplayName("System Administrator");
-        adminUserTask.setFormClass(NewConsumptionApproveForm.class);
+        adminUserTask.setFormClass(ConsumptionAdminApproveForm.class);
         adminUserTask.setApproverAssign(this::adminUserTaskApprover);
-        adminUserTask.addListener(consumptionApproveTaskListener);
+        adminUserTask.addListener(consumptionPassTaskListener);
         process.addTask(adminUserTask);
 
         // Set order relationship
@@ -120,9 +118,6 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     }
 
     private List<String> adminUserTaskApprover(WorkflowContext context) {
-        NewConsumptionWorkflowForm form = (NewConsumptionWorkflowForm) context.getProcessForm();
-        ConsumptionInfo consumptionInfo = Optional.ofNullable(form.getConsumptionInfo())
-                .orElseGet(ConsumptionInfo::new);
         return workflowApproverService.getApprovers(getProcessName().name(), UT_ADMINT_NAME,
                 new WorkflowApproverFilterContext());
     }
@@ -130,12 +125,11 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
     private List<String> bizOwnerUserTaskApprover(WorkflowContext context) {
         NewConsumptionWorkflowForm form = (NewConsumptionWorkflowForm) context.getProcessForm();
         BusinessInfo businessInfo = businessService.get(form.getConsumptionInfo().getInlongGroupId());
+        if (businessInfo == null || businessInfo.getInCharges() == null) {
+            return Collections.emptyList();
+        }
 
-        Iterable<String> inChargesIterator = Splitter.on(",").omitEmptyStrings().trimResults()
-                .split(businessInfo.getInCharges());
-        List<String> inCharges = Lists.newArrayList();
-        inChargesIterator.forEach(inCharges::add);
-        return inCharges;
+        return Arrays.asList(businessInfo.getInCharges().split(","));
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowForm.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowForm.java
index c9f7d18..d20db44 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowForm.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
 
 import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
similarity index 91%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
index e6cd836..ee483da 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
@@ -15,34 +15,30 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
+package org.apache.inlong.manager.service.workflow.consumption.listener;
 
+import java.util.Date;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
 import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
 import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import java.util.Date;
-
-import lombok.extern.slf4j.Slf4j;
-
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
  * Added data consumption process cancellation event listener
- *
  */
 @Slf4j
 @Component
 public class ConsumptionCancelProcessListener implements ProcessEventListener {
 
-    private ConsumptionEntityMapper consumptionEntityMapper;
+    private final ConsumptionEntityMapper consumptionEntityMapper;
 
     @Autowired
     public ConsumptionCancelProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
new file mode 100644
index 0000000..9f74716
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow.consumption.listener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
+import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService;
+import org.apache.inlong.manager.service.thirdpart.mq.TubeMqOptService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
+import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Added data consumption process complete archive event listener
+ */
+@Slf4j
+@Component
+public class ConsumptionCompleteProcessListener implements ProcessEventListener {
+
+    @Autowired
+    private PulsarOptService pulsarMqOptService;
+    @Autowired
+    private ClusterBean clusterBean;
+    @Autowired
+    private BusinessEntityMapper businessMapper;
+    @Autowired
+    private ConsumptionEntityMapper consumptionMapper;
+    @Autowired
+    private TubeMqOptService tubeMqOptService;
+
+    @Override
+    public ProcessEvent event() {
+        return ProcessEvent.COMPLETE;
+    }
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+        NewConsumptionWorkflowForm consumptionForm = (NewConsumptionWorkflowForm) context.getProcessForm();
+
+        // Real-time query of consumption information
+        Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
+        ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(consumptionId);
+        if (entity == null) {
+            throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
+        }
+
+        String middlewareType = entity.getMiddlewareType();
+        if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+            this.createTubeConsumerGroup(entity);
+            return ListenerResult.success("Create Tube consumer group successful");
+        } else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+            this.createPulsarTopicMessage(entity);
+        } else {
+            throw new WorkflowListenerException("middleware type [" + middlewareType + "] not supported");
+        }
+
+        this.updateConsumerInfo(consumptionId, entity.getConsumerGroupId());
+        return ListenerResult.success("create Tube /Pulsar consumer group successful");
+    }
+
+    /**
+     * Update consumption after approve
+     */
+    private void updateConsumerInfo(Integer consumptionId, String consumerGroupId) {
+        ConsumptionEntity update = new ConsumptionEntity();
+        update.setId(consumptionId);
+        update.setStatus(ConsumptionStatus.APPROVED.getStatus());
+        update.setConsumerGroupId(consumerGroupId);
+        update.setModifyTime(new Date());
+        consumptionMapper.updateByPrimaryKeySelective(update);
+    }
+
+    /**
+     * Create Pulsar consumption information, including cross-regional cycle creation of consumption groups
+     */
+    private void createPulsarTopicMessage(ConsumptionEntity entity) {
+        String groupId = entity.getInlongGroupId();
+        BusinessEntity businessEntity = businessMapper.selectByIdentifier(groupId);
+        Preconditions.checkNotNull(businessEntity, "business not found for groupId=" + groupId);
+        String mqResourceObj = businessEntity.getMqResourceObj();
+        Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + groupId);
+
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+            PulsarTopicBean topicMessage = new PulsarTopicBean();
+            String tenant = clusterBean.getDefaultTenant();
+            topicMessage.setTenant(tenant);
+            topicMessage.setNamespace(mqResourceObj);
+
+            // If cross-regional replication is started, each cluster needs to create consumer groups in cycles
+            String consumerGroup = entity.getConsumerGroupId();
+            List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+            List<String> topics = Arrays.asList(entity.getTopic().split(","));
+            this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics);
+        } catch (Exception e) {
+            log.error("create pulsar topic failed", e);
+            throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
+                    + e.getMessage());
+        }
+    }
+
+    private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean,
+            List<String> clusters, List<String> topics) {
+        try {
+            for (String cluster : clusters) {
+                String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+                try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceUrl)) {
+                    pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
+                }
+            }
+        } catch (Exception e) {
+            log.error("create pulsar consumer group failed", e);
+            throw new WorkflowListenerException("failed to create pulsar consumer group");
+        }
+    }
+
+    /**
+     * Create tube consumer group
+     */
+    private void createTubeConsumerGroup(ConsumptionEntity consumption) {
+        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
+        addTubeConsumeGroupRequest.setClusterId(1); // TODO is cluster id needed?
+        addTubeConsumeGroupRequest.setCreateUser(consumption.getCreator());
+        AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
+        bean.setTopicName(consumption.getTopic());
+        bean.setGroupName(consumption.getConsumerGroupId());
+        addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
+
+        try {
+            tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+        } catch (Exception e) {
+            throw new WorkflowListenerException("failed to create tube consumer group: " + addTubeConsumeGroupRequest);
+        }
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
index a9e1cbe..8159a2e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
@@ -15,15 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
+package org.apache.inlong.manager.service.workflow.consumption.listener;
 
 import com.alibaba.druid.util.StringUtils;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionApproveForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.ConsumptionAdminApproveForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -37,7 +38,7 @@ import org.springframework.stereotype.Component;
  */
 @Slf4j
 @Component
-public class ConsumptionApproveTaskListener implements TaskEventListener {
+public class ConsumptionPassTaskListener implements TaskEventListener {
 
     @Autowired
     private ConsumptionService consumptionService;
@@ -50,18 +51,18 @@ public class ConsumptionApproveTaskListener implements TaskEventListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         NewConsumptionWorkflowForm form = (NewConsumptionWorkflowForm) context.getProcessForm();
-        NewConsumptionApproveForm approveForm = (NewConsumptionApproveForm) context.getActionContext()
+        ConsumptionAdminApproveForm approveForm = (ConsumptionAdminApproveForm) context.getActionContext()
                 .getForm();
-        if (StringUtils.equals(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getConsumerGroupId())) {
+        ConsumptionInfo info = form.getConsumptionInfo();
+        if (StringUtils.equals(approveForm.getConsumerGroupId(), info.getConsumerGroupId())) {
             return ListenerResult.success("The consumer group name has not been modified");
         }
-        boolean exitDuplicate = this.consumptionService
-                .isConsumerGroupIdExists(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getId());
-        if (exitDuplicate) {
+        boolean exist = consumptionService.isConsumerGroupIdExists(approveForm.getConsumerGroupId(), info.getId());
+        if (exist) {
             log.error("consumerGroupId already exist! duplicate :{}", approveForm.getConsumerGroupId());
             throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_NAME_DUPLICATED);
         }
-        return ListenerResult.success("Consumer group name from" + form.getConsumptionInfo().getConsumerGroupId()
+        return ListenerResult.success("Consumer group name from" + info.getConsumerGroupId()
                 + "change to " + approveForm.getConsumerGroupId());
     }
 
@@ -69,4 +70,5 @@ public class ConsumptionApproveTaskListener implements TaskEventListener {
     public boolean async() {
         return false;
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
similarity index 94%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
index e02ce91..0bcbd9b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
+package org.apache.inlong.manager.service.workflow.consumption.listener;
 
 import java.util.Date;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
deleted file mode 100644
index f3221db..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.newbusiness;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.BizConstant;
-import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.dao.entity.DataStreamEntity;
-import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
-import org.apache.inlong.manager.service.core.StorageService;
-import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForAllStreamListener;
-import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeConsumerGroupTaskListener;
-import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
-import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CreateResourceCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CreateResourceFailedProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.InitBusinessInfoListener;
-import org.apache.inlong.manager.workflow.model.definition.EndEvent;
-import org.apache.inlong.manager.workflow.model.definition.Process;
-import org.apache.inlong.manager.workflow.model.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.model.definition.StartEvent;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Create workflow definitions for business resources
- */
-@Slf4j
-@Component
-public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
-
-    @Autowired
-    private InitBusinessInfoListener initBusinessInfoListener;
-
-    @Autowired
-    private CreateResourceCompleteProcessListener createResourceCompleteProcessListener;
-
-    @Autowired
-    private CreateResourceFailedProcessListener createResourceFailedProcessListener;
-
-    @Autowired
-    private CreateTubeTopicTaskListener createTubeTopicTaskListener;
-
-    @Autowired
-    private CreateTubeConsumerGroupTaskListener createTubeConsumerGroupTaskListener;
-
-    @Autowired
-    private CreateHiveTableForAllStreamListener createHiveTableForAllStreamListener;
-
-    @Autowired
-    private PushHiveConfigTaskListener pushHiveConfigTaskListener;
-
-    @Autowired
-    private StorageService storageService;
-
-    @Autowired
-    private DataStreamEntityMapper streamMapper;
-
-    @Override
-    public Process defineProcess() {
-
-        // Configuration process
-        Process process = new Process();
-        process.addListener(initBusinessInfoListener);
-        process.addListener(createResourceCompleteProcessListener);
-        process.addListener(createResourceFailedProcessListener);
-
-        process.setType("Business Resource Creation");
-        process.setName(getProcessName().name());
-        process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(CreateResourceWorkflowForm.class);
-        process.setVersion(1);
-        process.setHidden(true);
-
-        // Start node
-        StartEvent startEvent = new StartEvent();
-        process.setStartEvent(startEvent);
-
-        // End node
-        EndEvent endEvent = new EndEvent();
-        process.setEndEvent(endEvent);
-
-        ServiceTask createTubeTopicTask = new ServiceTask();
-        createTubeTopicTask.setSkipResolver(c -> {
-            CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
-            BusinessInfo businessInfo = form.getBusinessInfo();
-            if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
-                return false;
-            }
-            log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
-                    businessInfo.getMiddlewareType(), form.getInlongGroupId());
-            return true;
-        });
-        createTubeTopicTask.setName("createTubeTopic");
-        createTubeTopicTask.setDisplayName("Create Tube Topic");
-        createTubeTopicTask.addListener(createTubeTopicTaskListener);
-        process.addTask(createTubeTopicTask);
-
-        ServiceTask createConsumerGroupForSortTask = new ServiceTask();
-        createConsumerGroupForSortTask.setSkipResolver(c -> {
-            CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
-            BusinessInfo businessInfo = form.getBusinessInfo();
-            if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
-                return false;
-            }
-            log.warn("no need to create tube resource for groupId={}, as the middleware type is {}",
-                    form.getInlongGroupId(), businessInfo.getMiddlewareType());
-            return true;
-        });
-        createConsumerGroupForSortTask.setName("createConsumerGroupForSort");
-        createConsumerGroupForSortTask.setDisplayName("Create Consumer Group For Sort");
-        createConsumerGroupForSortTask.addListener(createTubeConsumerGroupTaskListener);
-        process.addTask(createConsumerGroupForSortTask);
-
-        ServiceTask createHiveTablesTask = new ServiceTask();
-        createHiveTablesTask.setSkipResolver(c -> {
-            CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
-            List<String> dsForHive = storageService
-                    .filterStreamIdByStorageType(form.getInlongGroupId(), BizConstant.STORAGE_HIVE,
-                            streamMapper
-                                    .selectByGroupId(form.getInlongGroupId())
-                                    .stream()
-                                    .map(DataStreamEntity::getInlongStreamId)
-                                    .collect(Collectors.toList()));
-            if (CollectionUtils.isEmpty(dsForHive)) {
-                log.warn("business {} dataStream {} does not have storage, skip to create hive table ",
-                        form.getInlongGroupId(), form.getInlongStreamId());
-                return true;
-            }
-            return false;
-        });
-        createHiveTablesTask.setName("createHiveTableTask");
-        createHiveTablesTask.setDisplayName("Create Hive Table");
-        createHiveTablesTask.addListener(createHiveTableForAllStreamListener);
-        process.addTask(createHiveTablesTask);
-
-        ServiceTask pushSortConfig = new ServiceTask();
-        pushSortConfig.setName("pushSortConfig");
-        pushSortConfig.setDisplayName("Push Sort Config");
-        pushSortConfig.addListener(pushHiveConfigTaskListener);
-        process.addTask(pushSortConfig);
-
-        startEvent.addNext(createTubeTopicTask);
-        createTubeTopicTask.addNext(createConsumerGroupForSortTask);
-        createConsumerGroupForSortTask.addNext(createHiveTablesTask);
-        createHiveTablesTask.addNext(pushSortConfig);
-        pushSortConfig.addNext(endEvent);
-
-        return process;
-    }
-
-    @Override
-    public ProcessName getProcessName() {
-        return ProcessName.CREATE_BUSINESS_RESOURCE;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
deleted file mode 100644
index 4ea6073..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
-
-import java.util.Collections;
-import java.util.Date;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.BizConstant;
-import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.thirdpart.mq.TubeMqOptService;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionApproveForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
-import org.apache.inlong.manager.workflow.core.QueryService;
-import org.apache.inlong.manager.workflow.core.event.ListenerResult;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
-import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
-import org.apache.inlong.manager.workflow.model.WorkflowContext;
-import org.apache.inlong.manager.workflow.model.definition.Process;
-import org.apache.inlong.manager.workflow.model.instance.TaskInstance;
-import org.apache.inlong.manager.workflow.model.view.TaskQuery;
-import org.apache.inlong.manager.workflow.util.WorkflowFormParserUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Added data consumption process complete archive event listener
- */
-@Slf4j
-@Component
-public class ConsumptionCompleteProcessListener implements ProcessEventListener {
-
-    @Autowired
-    private QueryService queryService;
-
-    @Autowired
-    private ConsumptionEntityMapper consumptionEntityMapper;
-
-    @Autowired
-    private ClusterInfoMapper clusterInfoMapper;
-
-    @Autowired
-    private TubeMqOptService tubeMqOptService;
-
-    @Override
-    public ProcessEvent event() {
-        return ProcessEvent.COMPLETE;
-    }
-
-    @Override
-    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-
-        NewConsumptionWorkflowForm workflowForm = (NewConsumptionWorkflowForm) context.getProcessForm();
-        NewConsumptionApproveForm adminApproveForm = getAdminApproveForm(context);
-
-        workflowForm.getConsumptionInfo().setConsumerGroupId(adminApproveForm.getConsumerGroupId());
-        updateConsumerInfo(workflowForm.getConsumptionInfo().getId(), adminApproveForm.getConsumerGroupId());
-
-        String middlewareType = workflowForm.getConsumptionInfo().getMiddlewareType();
-
-        if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
-            createTubeConsumerGroup(workflowForm.getConsumptionInfo());
-            return ListenerResult.success("Create Tube Consumer Group");
-        }
-
-        throw new BusinessException(BizErrorCodeEnum.INVALID_PARAMETER,
-                "Middleware type [" + middlewareType + "] not support");
-
-    }
-
-    private NewConsumptionApproveForm getAdminApproveForm(WorkflowContext context) {
-        TaskInstance adminTask = queryService.listTask(TaskQuery.builder()
-                        .processInstId(context.getProcessInstance().getId())
-                        .name(NewConsumptionWorkflowDefinition.UT_ADMINT_NAME)
-                        .build())
-                .stream()
-                .findFirst()
-                .orElseThrow(() -> new BusinessException(BizErrorCodeEnum.WORKFLOW_EXE_FAILED,
-                        "workflow err,not found task " + NewConsumptionWorkflowDefinition.UT_ADMINT_NAME));
-
-        Process process = context.getProcess();
-        NewConsumptionApproveForm form = WorkflowFormParserUtils.parseTaskForm(adminTask, process);
-        Preconditions.checkNotNull(form, "form cannot be null");
-        return form;
-    }
-
-    private void updateConsumerInfo(Integer consumerId, String consumerGroupId) {
-        ConsumptionEntity update = new ConsumptionEntity();
-        update.setId(consumerId);
-        update.setStatus(ConsumptionStatus.APPROVED.getStatus());
-        update.setConsumerGroupId(consumerGroupId);
-        update.setModifyTime(new Date());
-        consumptionEntityMapper.updateByPrimaryKeySelective(update);
-    }
-
-    private void createTubeConsumerGroup(ConsumptionInfo consumptionInfo) {
-        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
-        addTubeConsumeGroupRequest.setClusterId(1); // TODO is cluster id needed?
-        addTubeConsumeGroupRequest.setCreateUser(consumptionInfo.getCreator());
-        AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
-        bean.setTopicName(consumptionInfo.getTopic());
-        bean.setGroupName(consumptionInfo.getConsumerGroupId());
-        addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
-
-        try {
-            tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
-        } catch (BusinessException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED);
-        }
-    }
-
-    @Override
-    public boolean async() {
-        return false;
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
similarity index 51%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 3b16958..ff201ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
 
 import java.util.Collections;
 import java.util.List;
@@ -23,12 +23,14 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.service.core.StorageService;
-import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForOneStreamListener;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForStreamListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupForStreamTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarTopicForStreamTaskListener;
 import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.InitBusinessInfoListener;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.listener.InitBusinessInfoListener;
 import org.apache.inlong.manager.workflow.model.definition.EndEvent;
 import org.apache.inlong.manager.workflow.model.definition.Process;
 import org.apache.inlong.manager.workflow.model.definition.ServiceTask;
@@ -37,37 +39,41 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Single data stream access resource creation
+ * Data stream access resource creation
  */
 @Component
 @Slf4j
-public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
+public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
 
     @Autowired
     private StorageService storageService;
     @Autowired
     private InitBusinessInfoListener initBusinessInfoListener;
     @Autowired
-    private SingleStreamFailedProcessListener singleStreamFailedProcessListener;
+    private StreamFailedProcessListener streamFailedProcessListener;
     @Autowired
-    private SingleStreamCompleteProcessListener singleStreamCompleteProcessListener;
+    private StreamCompleteProcessListener streamCompleteProcessListener;
     @Autowired
-    private CreateHiveTableForOneStreamListener createHiveTableForOneStreamListener;
+    private CreateHiveTableForStreamListener createHiveTableListener;
     @Autowired
     private PushHiveConfigTaskListener pushHiveConfigTaskListener;
+    @Autowired
+    private CreatePulsarTopicForStreamTaskListener createPulsarTopicTaskListener;
+    @Autowired
+    private CreatePulsarGroupForStreamTaskListener createPulsarGroupTaskListener;
 
     @Override
     public Process defineProcess() {
         // Configuration process
         Process process = new Process();
         process.addListener(initBusinessInfoListener);
-        process.addListener(singleStreamFailedProcessListener);
-        process.addListener(singleStreamCompleteProcessListener);
+        process.addListener(streamFailedProcessListener);
+        process.addListener(streamCompleteProcessListener);
 
         process.setType("Data stream access resource creation");
         process.setName(getProcessName().name());
         process.setDisplayName(getProcessName().getDisplayName());
-        process.setFormClass(CreateResourceWorkflowForm.class);
+        process.setFormClass(BusinessResourceWorkflowForm.class);
         process.setVersion(1);
         process.setHidden(true);
 
@@ -79,9 +85,41 @@ public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
         EndEvent endEvent = new EndEvent();
         process.setEndEvent(endEvent);
 
+        ServiceTask createPulsarTopicTask = new ServiceTask();
+        createPulsarTopicTask.setSkipResolver(c -> {
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+            String middlewareType = form.getBusinessInfo().getMiddlewareType();
+            if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+                return false;
+            }
+            log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}",
+                    form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+            return true;
+        });
+        createPulsarTopicTask.setName("createPulsarTopic");
+        createPulsarTopicTask.setDisplayName("Stream-CreatePulsarTopic");
+        createPulsarTopicTask.addListener(createPulsarTopicTaskListener);
+        process.addTask(createPulsarTopicTask);
+
+        ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask();
+        createPulsarSubscriptionGroupTask.setSkipResolver(c -> {
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+            String middlewareType = form.getBusinessInfo().getMiddlewareType();
+            if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+                return false;
+            }
+            log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}",
+                    form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+            return true;
+        });
+        createPulsarSubscriptionGroupTask.setName("createPulsarSubscription");
+        createPulsarSubscriptionGroupTask.setDisplayName("Stream-CreatePulsarSubscription");
+        createPulsarSubscriptionGroupTask.addListener(createPulsarGroupTaskListener);
+        process.addTask(createPulsarSubscriptionGroupTask);
+
         ServiceTask createHiveTableTask = new ServiceTask();
         createHiveTableTask.setSkipResolver(c -> {
-            CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
+            BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
             String groupId = form.getInlongGroupId();
             String streamId = form.getInlongStreamId();
             List<String> dsForHive = storageService.filterStreamIdByStorageType(groupId, BizConstant.STORAGE_HIVE,
@@ -94,18 +132,20 @@ public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
             return false;
         });
 
-        createHiveTableTask.setName("createHiveTableTask");
-        createHiveTableTask.setDisplayName("Create Hive Table");
-        createHiveTableTask.addListener(createHiveTableForOneStreamListener);
+        createHiveTableTask.setName("createHiveTable");
+        createHiveTableTask.setDisplayName("Stream-CreateHiveTable");
+        createHiveTableTask.addListener(createHiveTableListener);
         process.addTask(createHiveTableTask);
 
         ServiceTask pushSortConfig = new ServiceTask();
         pushSortConfig.setName("pushSortConfig");
-        pushSortConfig.setDisplayName("Push Sort Configuration");
+        pushSortConfig.setDisplayName("Stream-PushSortConfig");
         pushSortConfig.addListener(pushHiveConfigTaskListener);
         process.addTask(pushSortConfig);
 
-        startEvent.addNext(createHiveTableTask);
+        startEvent.addNext(createPulsarTopicTask);
+        createPulsarTopicTask.addNext(createPulsarSubscriptionGroupTask);
+        createPulsarSubscriptionGroupTask.addNext(createHiveTableTask);
         createHiveTableTask.addNext(pushSortConfig);
         pushSortConfig.addNext(endEvent);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/InitBusinessInfoForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/InitBusinessInfoForStreamListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/InitBusinessInfoForStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/InitBusinessInfoForStreamListener.java
index b958a7f..1975689 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/InitBusinessInfoForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/InitBusinessInfoForStreamListener.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
 
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -46,7 +46,7 @@ public class InitBusinessInfoForStreamListener implements ProcessEventListener {
 
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         BusinessInfo businessInfo = businessService.get(context.getProcessForm().getInlongGroupId());
         if (businessInfo != null) {
             form.setBusinessInfo(businessInfo);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
index 0e9f89d..08cdff8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Event listener for completed creation of single data stream resource
+ * Event listener for completed creation of data stream resource
  */
 @Slf4j
 @Component
-public class SingleStreamCompleteProcessListener implements ProcessEventListener {
+public class StreamCompleteProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
@@ -53,7 +53,7 @@ public class SingleStreamCompleteProcessListener implements ProcessEventListener
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
         String streamId = form.getInlongStreamId();
         String username = context.getApplicant();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamFailedProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamFailedProcessListener.java
index 6ea3f14..fe0df90 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamFailedProcessListener.java
@@ -15,13 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
 import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
- * Event listener for failed creation of single data stream resource
+ * Event listener for failed creation of data stream resource
  */
 @Slf4j
 @Component
-public class SingleStreamFailedProcessListener implements ProcessEventListener {
+public class StreamFailedProcessListener implements ProcessEventListener {
 
     @Autowired
     private BusinessService businessService;
@@ -53,7 +53,7 @@ public class SingleStreamFailedProcessListener implements ProcessEventListener {
      */
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
         String streamId = form.getInlongStreamId();
         String username = context.getApplicant();
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 4ca899c..7ae7474 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -23,7 +23,7 @@ logging.level.org.apache.inlong.manager=debug
 
 spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
 spring.datasource.druid.username=root
-spring.datasource.druid.password=fighting
+spring.datasource.druid.password=inlong
 
 spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
 spring.datasource.druid.validationQuery=SELECT 'x'
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 056e5a6..af253e5 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -21,8 +21,8 @@
 logging.level.root=INFO
 
 spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
-spring.datasource.druid.username=xxxxxx
-spring.datasource.druid.password=xxxxxx
+spring.datasource.druid.username=root
+spring.datasource.druid.password=inlong
 
 spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
 spring.datasource.druid.validationQuery=SELECT 'x'
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index b4a75c9..7ae7474 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -22,8 +22,8 @@ logging.level.root=INFO
 logging.level.org.apache.inlong.manager=debug
 
 spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
-spring.datasource.druid.username=xxxxxx
-spring.datasource.druid.password=xxxxxx
+spring.datasource.druid.username=root
+spring.datasource.druid.password=inlong
 
 spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
 spring.datasource.druid.validationQuery=SELECT 'x'
diff --git a/inlong-manager/sql/apache_inlong_manager.sql b/inlong-manager/sql/apache_inlong_manager.sql
index a31f22f..6b3c543 100644
--- a/inlong-manager/sql/apache_inlong_manager.sql
+++ b/inlong-manager/sql/apache_inlong_manager.sql
@@ -79,27 +79,29 @@ CREATE TABLE `agent_sys_conf`
 DROP TABLE IF EXISTS `business`;
 CREATE TABLE `business`
 (
-    `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
-    `name`            varchar(128)          DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
-    `cn_name`         varchar(256)          DEFAULT NULL COMMENT 'Chinese display name',
-    `description`     varchar(256)          DEFAULT '' COMMENT 'Business Introduction',
-    `middleware_type` varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
-    `mq_resource_obj` varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
-    `daily_records`   int(11)               DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
-    `daily_storage`   int(11)               DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
-    `peak_records`    int(11)               DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
-    `max_length`      int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
-    `schema_name`     varchar(128)          DEFAULT NULL COMMENT 'Data type, associated data_schema table',
-    `in_charges`      varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `followers`       varchar(512)          DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(4)                DEFAULT '21' COMMENT 'Business status',
-    `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`        varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`       json                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+    `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`     varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
+    `name`                varchar(128)      DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
+    `cn_name`             varchar(256)      DEFAULT NULL COMMENT 'Chinese display name',
+    `description`         varchar(256)      DEFAULT '' COMMENT 'Business Introduction',
+    `middleware_type`     varchar(10)       DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+    `queue_module`        VARCHAR(20)  NULL DEFAULT 'parallel' COMMENT 'Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order messages; serial: single partition, low throughput, and orderly messages',
+    `topic_partition_num` INT(4)       NULL DEFAULT '3' COMMENT 'The number of partitions of Pulsar Topic, 1-20',
+    `mq_resource_obj`     varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
+    `daily_records`       int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`       int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`        int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`          int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+    `schema_name`         varchar(128)      DEFAULT NULL COMMENT 'Data type, associated data_schema table',
+    `in_charges`          varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+    `followers`           varchar(512)      DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
+    `status`              int(4)            DEFAULT '21' COMMENT 'Business status',
+    `is_deleted`          tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`            varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`           json              DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_business` (`inlong_group_id`, `is_deleted`, `modify_time`)
 ) ENGINE = InnoDB
@@ -113,18 +115,18 @@ CREATE TABLE `business_pulsar`
 (
     `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`     varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
-    `ensemble`            int(3)                DEFAULT '3' COMMENT 'The writable nodes number of ledger',
-    `write_quorum`        int(3)                DEFAULT '3' COMMENT 'The copies number of ledger',
-    `ack_quorum`          int(3)                DEFAULT '2' COMMENT 'The number of requested acks',
-    `retention_time`      int(11)               DEFAULT '72' COMMENT 'Message storage time',
-    `retention_time_unit` char(20)              DEFAULT 'hours' COMMENT 'The unit of the message storage time',
-    `ttl`                 int(11)               DEFAULT '24' COMMENT 'Message time-to-live duration',
-    `ttl_unit`            varchar(20)           DEFAULT 'hours' COMMENT 'The unit of time-to-live duration',
-    `retention_size`      int(11)               DEFAULT '-1' COMMENT 'Message size',
-    `retention_size_unit` varchar(20)           DEFAULT 'MB' COMMENT 'The unit of message size',
-    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `ensemble`            int(3)            DEFAULT '3' COMMENT 'The writable nodes number of ledger',
+    `write_quorum`        int(3)            DEFAULT '3' COMMENT 'The copies number of ledger',
+    `ack_quorum`          int(3)            DEFAULT '2' COMMENT 'The number of requested acks',
+    `retention_time`      int(11)           DEFAULT '72' COMMENT 'Message storage time',
+    `retention_time_unit` char(20)          DEFAULT 'hours' COMMENT 'The unit of the message storage time',
+    `ttl`                 int(11)           DEFAULT '24' COMMENT 'Message time-to-live duration',
+    `ttl_unit`            varchar(20)       DEFAULT 'hours' COMMENT 'The unit of time-to-live duration',
+    `retention_size`      int(11)           DEFAULT '-1' COMMENT 'Message size',
+    `retention_size_unit` varchar(20)       DEFAULT 'MB' COMMENT 'The unit of message size',
+    `is_deleted`          tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `create_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar info table';
@@ -138,9 +140,9 @@ CREATE TABLE `business_ext`
     `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id',
     `key_name`        varchar(64)  NOT NULL COMMENT 'Configuration item name',
-    `key_value`       varchar(256)          DEFAULT NULL COMMENT 'The value of the configuration item',
-    `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `key_value`       varchar(256)      DEFAULT NULL COMMENT 'The value of the configuration item',
+    `is_deleted`      tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `modify_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_group_id` (`inlong_group_id`)
 ) ENGINE = InnoDB
@@ -158,15 +160,15 @@ CREATE TABLE `cluster_info`
     `ip`          varchar(64)  NOT NULL COMMENT 'Cluster IP address',
     `port`        int(11)      NOT NULL COMMENT 'Cluster port',
     `in_charges`  varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
-    `url`         varchar(256)          DEFAULT NULL COMMENT 'Cluster URL address',
-    `is_backup`   tinyint(1)            DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
-    `ext_props`   json                  DEFAULT NULL COMMENT 'extended properties',
-    `status`      int(4)                DEFAULT '1' COMMENT 'cluster status',
-    `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `url`         varchar(256)      DEFAULT NULL COMMENT 'Cluster URL address',
+    `is_backup`   tinyint(1)        DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
+    `ext_props`   json              DEFAULT NULL COMMENT 'extended properties',
+    `status`      int(4)            DEFAULT '1' COMMENT 'cluster status',
+    `is_deleted`  tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Cluster Information Table';
@@ -180,29 +182,29 @@ CREATE TABLE `common_db_server`
     `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `access_type`         varchar(20)  NOT NULL COMMENT 'Collection type, with Agent, DataProxy client, LoadProxy',
     `connection_name`     varchar(128) NOT NULL COMMENT 'The name of the database connection',
-    `db_type`             varchar(128)          DEFAULT 'MySQL' COMMENT 'DB type, such as MySQL, Oracle',
+    `db_type`             varchar(128)      DEFAULT 'MySQL' COMMENT 'DB type, such as MySQL, Oracle',
     `db_server_ip`        varchar(64)  NOT NULL COMMENT 'DB Server IP',
     `port`                int(11)      NOT NULL COMMENT 'Port number',
-    `db_name`             varchar(128)          DEFAULT NULL COMMENT 'Target database name',
+    `db_name`             varchar(128)      DEFAULT NULL COMMENT 'Target database name',
     `username`            varchar(64)  NOT NULL COMMENT 'Username',
     `password`            varchar(64)  NOT NULL COMMENT 'The password corresponding to the above user name',
-    `has_select`          tinyint(1)            DEFAULT '0' COMMENT 'Is there DB permission select, 0: No, 1: Yes',
-    `has_insert`          tinyint(1)            DEFAULT '0' COMMENT 'Is there DB permission to insert, 0: No, 1: Yes',
-    `has_update`          tinyint(1)            DEFAULT '0' COMMENT 'Is there a DB permission update, 0: No, 1: Yes',
-    `has_delete`          tinyint(1)            DEFAULT '0' COMMENT 'Is there a DB permission to delete, 0: No, 1: Yes',
+    `has_select`          tinyint(1)        DEFAULT '0' COMMENT 'Is there DB permission select, 0: No, 1: Yes',
+    `has_insert`          tinyint(1)        DEFAULT '0' COMMENT 'Is there DB permission to insert, 0: No, 1: Yes',
+    `has_update`          tinyint(1)        DEFAULT '0' COMMENT 'Is there a DB permission update, 0: No, 1: Yes',
+    `has_delete`          tinyint(1)        DEFAULT '0' COMMENT 'Is there a DB permission to delete, 0: No, 1: Yes',
     `in_charges`          varchar(512) NOT NULL COMMENT 'DB person in charge, separated by a comma when there are multiple ones',
-    `is_region_id`        tinyint(1)            DEFAULT '0' COMMENT 'Whether it contains a region ID, 0: No, 1: Yes',
-    `db_description`      varchar(256)          DEFAULT NULL COMMENT 'DB description',
-    `backup_db_server_ip` varchar(64)           DEFAULT NULL COMMENT 'Backup DB HOST',
-    `backup_db_port`      int(11)               DEFAULT NULL COMMENT 'Backup DB port',
-    `status`              int(4)                DEFAULT '0' COMMENT 'status',
-    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `is_region_id`        tinyint(1)        DEFAULT '0' COMMENT 'Whether it contains a region ID, 0: No, 1: Yes',
+    `db_description`      varchar(256)      DEFAULT NULL COMMENT 'DB description',
+    `backup_db_server_ip` varchar(64)       DEFAULT NULL COMMENT 'Backup DB HOST',
+    `backup_db_port`      int(11)           DEFAULT NULL COMMENT 'Backup DB port',
+    `status`              int(4)            DEFAULT '0' COMMENT 'status',
+    `is_deleted`          tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `visible_person`      varchar(1024)         DEFAULT NULL COMMENT 'List of visible persons, separated by commas',
-    `visible_group`       varchar(1024)         DEFAULT NULL COMMENT 'List of visible groups, separated by commas',
+    `modifier`            varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `visible_person`      varchar(1024)     DEFAULT NULL COMMENT 'List of visible persons, separated by commas',
+    `visible_group`       varchar(1024)     DEFAULT NULL COMMENT 'List of visible groups, separated by commas',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Common Database Server Table';
@@ -244,16 +246,16 @@ CREATE TABLE `consumption`
     `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
     `in_charges`          varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
     `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group id',
-    `middleware_type`     varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+    `middleware_type`     varchar(10)       DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
     `topic`               varchar(255) NOT NULL COMMENT 'Consumption topic',
-    `filter_enabled`      int(2)                DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
-    `inlong_stream_id`    varchar(1024)         DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
+    `filter_enabled`      int(2)            DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
+    `inlong_stream_id`    varchar(1024)     DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
     `status`              int(4)       NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
-    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `is_deleted`          tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'creator',
-    `modifier`            varchar(64)           DEFAULT NULL COMMENT 'modifier',
-    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`            varchar(64)       DEFAULT NULL COMMENT 'modifier',
+    `create_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`         timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
@@ -286,20 +288,20 @@ CREATE TABLE `data_proxy_cluster`
 (
     `id`          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `name`        varchar(128) NOT NULL COMMENT 'cluster name',
-    `description` varchar(500)          DEFAULT NULL COMMENT 'cluster description',
+    `description` varchar(500)      DEFAULT NULL COMMENT 'cluster description',
     `address`     varchar(128) NOT NULL COMMENT 'cluster address',
-    `port`        varchar(256)          DEFAULT '46801' COMMENT 'Access port number, multiple ports are separated by a comma',
-    `is_backup`   tinyint(1)            DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
-    `is_inner_ip` tinyint(1)            DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
-    `net_type`    varchar(20)           DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
-    `in_charges`  varchar(512)          DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
-    `ext_props`   json                  DEFAULT NULL COMMENT 'Extended properties',
-    `status`      int(4)                DEFAULT '1' COMMENT 'Cluster status',
-    `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `port`        varchar(256)      DEFAULT '46801' COMMENT 'Access port number, multiple ports are separated by a comma',
+    `is_backup`   tinyint(1)        DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
+    `is_inner_ip` tinyint(1)        DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
+    `net_type`    varchar(20)       DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
+    `in_charges`  varchar(512)      DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
+    `ext_props`   json              DEFAULT NULL COMMENT 'Extended properties',
+    `status`      int(4)            DEFAULT '1' COMMENT 'Cluster status',
+    `is_deleted`  tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `modifier`    varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='DataProxy cluster table';
@@ -354,28 +356,28 @@ CREATE TABLE `data_stream`
     `inlong_stream_id`       varchar(128) NOT NULL COMMENT 'Data stream id, non-deleted globally unique',
     `inlong_group_id`        varchar(128) NOT NULL COMMENT 'Owning business group id',
     `name`                   varchar(64)  NOT NULL COMMENT 'The name of the data stream page display, can be Chinese',
-    `description`            varchar(256)          DEFAULT '' COMMENT 'Introduction to data stream',
-    `mq_resource_obj`        varchar(128)          DEFAULT NULL COMMENT 'MQ resource object, in the data stream, Tube is data_stream_id, Pulsar is Topic',
-    `data_source_type`       varchar(32)           DEFAULT 'FILE' COMMENT 'Data source type, including: FILE, DB, Auto-Push (DATA_PROXY_SDK, HTTP)',
-    `storage_period`         int(11)               DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
-    `data_type`              varchar(20)           DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
-    `data_encoding`          varchar(8)            DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
-    `data_separator`         varchar(8)            DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
-    `data_escape_char`       varchar(8)            DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
-    `have_predefined_fields` tinyint(1)            DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
-    `daily_records`          int(11)               DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
-    `daily_storage`          int(11)               DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
-    `peak_records`           int(11)               DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
-    `max_length`             int(11)               DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
-    `in_charges`             varchar(512)          DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`                 int(4)                DEFAULT '0' COMMENT 'Data stream status',
-    `previous_status`        int(4)                DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`             tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`                varchar(64)           DEFAULT NULL COMMENT 'Creator name',
-    `modifier`               varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`            timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`              json                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+    `description`            varchar(256)      DEFAULT '' COMMENT 'Introduction to data stream',
+    `mq_resource_obj`        varchar(128)      DEFAULT NULL COMMENT 'MQ resource object, in the data stream, Tube is data_stream_id, Pulsar is Topic',
+    `data_source_type`       varchar(32)       DEFAULT 'FILE' COMMENT 'Data source type, including: FILE, DB, Auto-Push (DATA_PROXY_SDK, HTTP)',
+    `storage_period`         int(11)           DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
+    `data_type`              varchar(20)       DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
+    `data_encoding`          varchar(8)        DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
+    `data_separator`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_escape_char`       varchar(8)        DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
+    `have_predefined_fields` tinyint(1)        DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+    `daily_records`          int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`          int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`           int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`             int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+    `in_charges`             varchar(512)      DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
+    `status`                 int(4)            DEFAULT '0' COMMENT 'Data stream status',
+    `previous_status`        int(4)            DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`             tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`                varchar(64)       DEFAULT NULL COMMENT 'Creator name',
+    `modifier`               varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`              json              DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_data_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`, `modify_time`)
 ) ENGINE = InnoDB
@@ -391,9 +393,9 @@ CREATE TABLE `data_stream_ext`
     `inlong_group_id`  varchar(128) NOT NULL COMMENT 'Owning business group id',
     `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
     `key_name`         varchar(64)  NOT NULL COMMENT 'Configuration item name',
-    `key_value`        varchar(256)          DEFAULT NULL COMMENT 'The value of the configuration item',
-    `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `key_value`        varchar(256)      DEFAULT NULL COMMENT 'The value of the configuration item',
+    `is_deleted`       tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_stream_id` (`inlong_stream_id`)
 ) ENGINE = InnoDB
@@ -479,13 +481,13 @@ CREATE TABLE `source_db_basic`
     `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`  varchar(128) NOT NULL COMMENT 'Owning business group id',
     `inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning data stream id',
-    `sync_type`        tinyint(1)            DEFAULT '0' COMMENT 'Data synchronization type, 0: FULL, full amount, 1: INCREMENTAL, incremental',
-    `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `sync_type`        tinyint(1)        DEFAULT '0' COMMENT 'Data synchronization type, 0: FULL, full amount, 1: INCREMENTAL, incremental',
+    `is_deleted`       tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`        json                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+    `modifier`         varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`        json              DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Basic configuration of DB data source';
@@ -500,21 +502,21 @@ CREATE TABLE `source_db_detail`
     `inlong_group_id`  varchar(128) NOT NULL COMMENT 'Owning business group id',
     `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
     `access_type`      varchar(20)  NOT NULL COMMENT 'Collection type, with Agent, DataProxy client, LoadProxy',
-    `db_name`          varchar(128)          DEFAULT NULL COMMENT 'database name',
-    `transfer_ip`      varchar(64)           DEFAULT NULL COMMENT 'Transfer IP',
-    `connection_name`  varchar(128)          DEFAULT NULL COMMENT 'The name of the database connection',
-    `table_name`       varchar(128)          DEFAULT NULL COMMENT 'Data table name, required for increment',
+    `db_name`          varchar(128)      DEFAULT NULL COMMENT 'database name',
+    `transfer_ip`      varchar(64)       DEFAULT NULL COMMENT 'Transfer IP',
+    `connection_name`  varchar(128)      DEFAULT NULL COMMENT 'The name of the database connection',
+    `table_name`       varchar(128)      DEFAULT NULL COMMENT 'Data table name, required for increment',
     `table_fields`     longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
     `data_sql`         longtext COMMENT 'SQL statement to collect source data, required for full amount',
-    `crontab`          varchar(56)           DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
-    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `crontab`          varchar(56)       DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
+    `status`           int(4)            DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)            DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`       tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`        json                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
+    `modifier`         varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`        json              DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='DB data source details table';
@@ -528,19 +530,19 @@ CREATE TABLE `source_file_basic`
     `id`                int(11)      NOT NULL AUTO_INCREMENT COMMENT 'ID',
     `inlong_group_id`   varchar(128) NOT NULL COMMENT 'Business group id',
     `inlong_stream_id`  varchar(128) NOT NULL COMMENT 'Data stream id',
-    `is_hybrid_source`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to mix data sources',
-    `is_table_mapping`  tinyint(1)            DEFAULT '0' COMMENT 'Is there a table name mapping',
-    `date_offset`       int(4)                DEFAULT '0' COMMENT 'Time offset\n',
-    `date_offset_unit`  varchar(2)            DEFAULT 'H' COMMENT 'Time offset unit',
-    `file_rolling_type` varchar(2)            DEFAULT 'H' COMMENT 'File rolling type',
-    `upload_max_size`   int(4)                DEFAULT '120' COMMENT 'Upload maximum size',
-    `need_compress`     tinyint(1)            DEFAULT '0' COMMENT 'Whether need compress',
-    `is_deleted`        tinyint(1)            DEFAULT '0' COMMENT 'Delete switch',
+    `is_hybrid_source`  tinyint(1)        DEFAULT '0' COMMENT 'Whether to mix data sources',
+    `is_table_mapping`  tinyint(1)        DEFAULT '0' COMMENT 'Is there a table name mapping',
+    `date_offset`       int(4)            DEFAULT '0' COMMENT 'Time offset\n',
+    `date_offset_unit`  varchar(2)        DEFAULT 'H' COMMENT 'Time offset unit',
+    `file_rolling_type` varchar(2)        DEFAULT 'H' COMMENT 'File rolling type',
+    `upload_max_size`   int(4)            DEFAULT '120' COMMENT 'Upload maximum size',
+    `need_compress`     tinyint(1)        DEFAULT '0' COMMENT 'Whether need compress',
+    `is_deleted`        tinyint(1)        DEFAULT '0' COMMENT 'Delete switch',
     `creator`           varchar(64)  NOT NULL COMMENT 'Creator',
-    `modifier`          varchar(64)           DEFAULT NULL COMMENT 'Modifier',
-    `create_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`       timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`         json                  DEFAULT NULL COMMENT 'temp view',
+    `modifier`          varchar(64)       DEFAULT NULL COMMENT 'Modifier',
+    `create_time`       timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`       timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`         json              DEFAULT NULL COMMENT 'temp view',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='basic configuration of file data source';
@@ -554,23 +556,23 @@ CREATE TABLE `source_file_detail`
     `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`  varchar(128) NOT NULL COMMENT 'Owning business group id',
     `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
-    `access_type`      varchar(20)           DEFAULT 'Agent' COMMENT 'Collection type, there are Agent, DataProxy client, LoadProxy, the file can only be Agent temporarily',
+    `access_type`      varchar(20)       DEFAULT 'Agent' COMMENT 'Collection type, there are Agent, DataProxy client, LoadProxy, the file can only be Agent temporarily',
     `server_name`      varchar(64)  NOT NULL COMMENT 'The name of the data source service. If it is empty, add configuration through the following fields',
     `ip`               varchar(128) NOT NULL COMMENT 'Data source IP address',
     `port`             int(11)      NOT NULL COMMENT 'Data source port number',
-    `is_inner_ip`      tinyint(1)            DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
-    `issue_type`       varchar(10)           DEFAULT 'SSH' COMMENT 'Issuing method, there are SSH, TCS',
+    `is_inner_ip`      tinyint(1)        DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
+    `issue_type`       varchar(10)       DEFAULT 'SSH' COMMENT 'Issuing method, there are SSH, TCS',
     `username`         varchar(32)  NOT NULL COMMENT 'User name of the data source IP host',
     `password`         varchar(64)  NOT NULL COMMENT 'The password corresponding to the above user name',
     `file_path`        varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
-    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `status`           int(4)            DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)            DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`       tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`        json                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
+    `modifier`         varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`        json              DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Detailed table of file data source';
@@ -602,31 +604,31 @@ CREATE TABLE `storage_hive`
     `id`                          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`             varchar(128) NOT NULL COMMENT 'Owning business group id',
     `inlong_stream_id`            varchar(128) NOT NULL COMMENT 'Owning data stream id',
-    `jdbc_url`                    varchar(255)          DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
-    `username`                    varchar(128)          DEFAULT NULL COMMENT 'Username',
-    `password`                    varchar(255)          DEFAULT NULL COMMENT 'User password',
-    `db_name`                     varchar(128)          DEFAULT NULL COMMENT 'Target database name',
-    `table_name`                  varchar(128)          DEFAULT NULL COMMENT 'Target data table name',
-    `hdfs_default_fs`             varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
-    `warehouse_dir`               varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
-    `partition_interval`          int(5)                DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
-    `partition_unit`              varchar(10)           DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
-    `primary_partition`           varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
-    `secondary_partition`         varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
-    `partition_creation_strategy` varchar(50)           DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
-    `file_format`                 varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
-    `data_encoding`               varchar(20)           DEFAULT 'UTF-8' COMMENT 'data encoding type',
-    `data_separator`              varchar(10)           DEFAULT NULL COMMENT 'data field separator',
-    `storage_period`              int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
-    `opt_log`                     varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
-    `status`                      int(4)                DEFAULT '0' COMMENT 'status',
-    `previous_status`             int(4)                DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`                  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`                     varchar(64)           DEFAULT NULL COMMENT 'creator name',
-    `modifier`                    varchar(64)           DEFAULT NULL COMMENT 'modifier name',
-    `create_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`                   json                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
+    `jdbc_url`                    varchar(255)      DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+    `username`                    varchar(128)      DEFAULT NULL COMMENT 'Username',
+    `password`                    varchar(255)      DEFAULT NULL COMMENT 'User password',
+    `db_name`                     varchar(128)      DEFAULT NULL COMMENT 'Target database name',
+    `table_name`                  varchar(128)      DEFAULT NULL COMMENT 'Target data table name',
+    `hdfs_default_fs`             varchar(255)      DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+    `warehouse_dir`               varchar(250)      DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+    `partition_interval`          int(5)            DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+    `partition_unit`              varchar(10)       DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+    `primary_partition`           varchar(255)      DEFAULT 'dt' COMMENT 'primary partition field',
+    `secondary_partition`         varchar(256)      DEFAULT NULL COMMENT 'secondary partition field',
+    `partition_creation_strategy` varchar(50)       DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+    `file_format`                 varchar(15)       DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+    `data_encoding`               varchar(20)       DEFAULT 'UTF-8' COMMENT 'data encoding type',
+    `data_separator`              varchar(10)       DEFAULT NULL COMMENT 'data field separator',
+    `storage_period`              int(5)            DEFAULT '10' COMMENT 'Data storage period, unit: day',
+    `opt_log`                     varchar(5000)     DEFAULT NULL COMMENT 'Background operation log',
+    `status`                      int(4)            DEFAULT '0' COMMENT 'status',
+    `previous_status`             int(4)            DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`                  tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`                     varchar(64)       DEFAULT NULL COMMENT 'creator name',
+    `modifier`                    varchar(64)       DEFAULT NULL COMMENT 'modifier name',
+    `create_time`                 timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`                 timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `temp_view`                   json              DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data is stored in Hive configuration table';
@@ -893,15 +895,15 @@ CREATE TABLE `cluster_set`
     `set_name`        varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `cn_name`         varchar(256) COMMENT 'Chinese display name',
     `description`     varchar(256) COMMENT 'ClusterSet Introduction',
-    `middleware_type` varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+    `middleware_type` varchar(10)       DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
     `in_charges`      varchar(512) COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512) COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(4)                DEFAULT '21' COMMENT 'ClusterSet status',
-    `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `status`          int(4)            DEFAULT '21' COMMENT 'ClusterSet status',
+    `is_deleted`      tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)  NULL COMMENT 'Modifier name',
-    `create_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `create_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`     timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     UNIQUE KEY `unique_cluster_set` (`set_name`)
 ) ENGINE = InnoDB
@@ -946,8 +948,8 @@ CREATE TABLE `cache_cluster_ext`
     `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
     `key_name`     varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`    varchar(256) NULL COMMENT 'The value of the configuration item',
-    `is_deleted`   tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `is_deleted`   tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `modify_time`  timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_cache_cluster` (`cluster_name`)
 ) ENGINE = InnoDB
@@ -1025,8 +1027,8 @@ CREATE TABLE `flume_source_ext`
     `set_name`    varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
-    `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `is_deleted`  tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_flume_source_ext` (`parent_name`)
 ) ENGINE = InnoDB
@@ -1058,8 +1060,8 @@ CREATE TABLE `flume_channel_ext`
     `set_name`    varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
-    `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `is_deleted`  tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_flume_channel_ext` (`parent_name`)
 ) ENGINE = InnoDB
@@ -1092,8 +1094,8 @@ CREATE TABLE `flume_sink_ext`
     `set_name`    varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
-    `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `is_deleted`  tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `modify_time` timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     PRIMARY KEY (`id`),
     KEY `index_flume_sink_ext` (`parent_name`)
 ) ENGINE = InnoDB