You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/11/28 09:56:11 UTC

[incubator-inlong] branch master updated: [INLONG-1849][Feature][InLong-Manager] Push Pulsar info for Sort (#1850)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 74c4c82  [INLONG-1849][Feature][InLong-Manager] Push Pulsar info for Sort (#1850)
74c4c82 is described below

commit 74c4c822881b858361b770b2c22d88744cee89a8
Author: healchow <he...@gmail.com>
AuthorDate: Sun Nov 28 17:56:05 2021 +0800

    [INLONG-1849][Feature][InLong-Manager] Push Pulsar info for Sort (#1850)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../docker-compose/sql/apache_inlong_manager.sql   |  99 +++---
 inlong-manager/doc/sql/apache_inlong_manager.sql   |  99 +++---
 .../inlong/manager/common/enums/BizConstant.java   |   8 +
 .../manager/common/pojo/business/BusinessInfo.java |   3 +-
 .../common/pojo/business/BusinessMqExtBase.java    |   2 +-
 .../common/pojo/business/BusinessPulsarInfo.java   |   6 +-
 .../common/pojo/consumption/ConsumptionInfo.java   |   8 +-
 .../common/pojo/datastorage/StorageHiveInfo.java   |  50 +--
 .../StorageHiveSortInfo.java}                      |  90 ++----
 .../pojo/datastorage/StoragePageRequest.java       |   3 +
 .../common/pojo/datastream/DataStreamInfo.java     |  23 +-
 .../pojo/datastream/DataStreamPageRequest.java     |   3 +
 .../manager/dao/entity/DataStreamEntity.java       |   5 +-
 .../manager/dao/entity/StorageHiveEntity.java      |  18 +-
 .../manager/dao/mapper/DataStreamEntityMapper.java |  20 +-
 .../dao/mapper/StorageHiveEntityMapper.java        |  14 +-
 .../resources/mappers/DataStreamEntityMapper.xml   | 351 ++++++++-------------
 .../resources/mappers/StorageHiveEntityMapper.xml  | 285 ++++++++++-------
 .../test/resources/sql/apache_inlong_manager.sql   | 195 +++++++-----
 .../manager/service/core/DataStreamService.java    |  18 --
 .../service/core/impl/BusinessServiceImpl.java     |  36 ---
 .../service/core/impl/DataStreamServiceImpl.java   |  33 +-
 .../service/core/impl/StorageHiveOperation.java    |   8 +-
 .../service/core/impl/StorageServiceImpl.java      |   2 +-
 .../core/impl/WorkflowApproverServiceImpl.java     |   4 +-
 .../hive/CreateHiveTableForAllStreamListener.java  |  11 +-
 .../hive/CreateHiveTableForOneStreamListener.java  |  16 +-
 .../service/thirdpart/hive/HiveTableOperator.java  |  12 +-
 .../thirdpart/sort/PushHiveConfigTaskListener.java | 271 +++++++++++-----
 .../thirdpart/sort/SortFieldFormatUtils.java       |  26 +-
 .../ConsumptionCompleteProcessListener.java        |   8 +-
 .../test/resources/sql/apache_inlong_manager.sql   | 182 ++++++-----
 .../core/processor/StartEventProcessor.java        |   2 +-
 .../workflow/core/processor/UserTaskProcessor.java |   4 +-
 .../workflow/util/WorkflowFormParserUtils.java     |   6 +-
 35 files changed, 1023 insertions(+), 898 deletions(-)

diff --git a/docker/docker-compose/sql/apache_inlong_manager.sql b/docker/docker-compose/sql/apache_inlong_manager.sql
index dec268e..8cae257 100644
--- a/docker/docker-compose/sql/apache_inlong_manager.sql
+++ b/docker/docker-compose/sql/apache_inlong_manager.sql
@@ -93,7 +93,7 @@ CREATE TABLE `business`
     `schema_name`     varchar(128)          DEFAULT NULL COMMENT 'Data type, associated data_schema table',
     `in_charges`      varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512)          DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'Business status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'Business status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -161,7 +161,7 @@ CREATE TABLE `cluster_info`
     `url`         varchar(256)          DEFAULT NULL COMMENT 'Cluster URL address',
     `is_backup`   tinyint(1)            DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
     `ext_props`   json                  DEFAULT NULL COMMENT 'extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -195,7 +195,7 @@ CREATE TABLE `common_db_server`
     `db_description`      varchar(256)          DEFAULT NULL COMMENT 'DB description',
     `backup_db_server_ip` varchar(64)           DEFAULT NULL COMMENT 'Backup DB HOST',
     `backup_db_port`      int(11)               DEFAULT NULL COMMENT 'Backup DB port',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
+    `status`              int(4)                DEFAULT '0' COMMENT 'status',
     `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -221,7 +221,7 @@ CREATE TABLE `common_file_server`
     `issue_type`     varchar(128)         DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
     `username`       varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
     `password`       varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
-    `status`         int(11)              DEFAULT '0' COMMENT 'status',
+    `status`         int(4)               DEFAULT '0' COMMENT 'status',
     `is_deleted`     tinyint(1)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`        varchar(64) NOT NULL COMMENT 'Creator name',
     `modifier`       varchar(64)          DEFAULT NULL COMMENT 'Modifier name',
@@ -248,12 +248,12 @@ CREATE TABLE `consumption`
     `topic`               varchar(255) NOT NULL COMMENT 'Consumption topic',
     `filter_enabled`      int(2)                DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
     `inlong_stream_id`    varchar(1024)         DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
-    `status`              int(11)      NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `status`              int(4)       NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'creator',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'modifier',
     `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `is_deleted`          int(2)                DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
@@ -264,8 +264,8 @@ CREATE TABLE `consumption`
 DROP TABLE IF EXISTS `consumption_pulsar`;
 CREATE TABLE `consumption_pulsar`
 (
-    `id`                  int          NOT NULL AUTO_INCREMENT,
-    `consumption_id`      int          DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+    `id`                  int(11)      NOT NULL AUTO_INCREMENT,
+    `consumption_id`      int(11)      DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
     `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
     `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
     `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group ID',
@@ -273,7 +273,7 @@ CREATE TABLE `consumption_pulsar`
     `retry_letter_topic`  varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
     `is_dlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
     `dead_letter_topic`   varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
-    `is_deleted`          int          DEFAULT '0' COMMENT 'Whether to delete',
+    `is_deleted`          tinyint(1)   DEFAULT '0' COMMENT 'Whether to delete',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
@@ -294,7 +294,7 @@ CREATE TABLE `data_proxy_cluster`
     `net_type`    varchar(20)           DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
     `in_charges`  varchar(512)          DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
     `ext_props`   json                  DEFAULT NULL COMMENT 'Extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'Cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'Cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -360,11 +360,16 @@ CREATE TABLE `data_stream`
     `storage_period`         int(11)           DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
     `data_type`              varchar(20)       DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
     `data_encoding`          varchar(8)        DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
-    `file_delimiter`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_separator`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_escape_char`       varchar(8)        DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
     `have_predefined_fields` tinyint(1)        DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+    `daily_records`          int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`          int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`           int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`             int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `in_charges`             varchar(512)      DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`                 int(11)           DEFAULT '0' COMMENT 'Data stream status',
-    `previous_status`        int(11)           DEFAULT '0' COMMENT 'Previous status',
+    `status`                 int(4)            DEFAULT '0' COMMENT 'Data stream status',
+    `previous_status`        int(4)            DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`             tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`                varchar(64)       DEFAULT NULL COMMENT 'Creator name',
     `modifier`               varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
@@ -438,7 +443,7 @@ CREATE TABLE `operation_log`
     `cost_time`           bigint(20)         DEFAULT NULL COMMENT 'time-consuming',
     `body`                text COMMENT 'Request body',
     `param`               text COMMENT 'parameter',
-    `status`              tinyint(1)         DEFAULT NULL COMMENT 'status',
+    `status`              int(4)             DEFAULT NULL COMMENT 'status',
     `request_time`        timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
     `err_msg`             text COMMENT 'Error message',
     PRIMARY KEY (`id`)
@@ -502,8 +507,8 @@ CREATE TABLE `source_db_detail`
     `table_fields`     longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
     `data_sql`         longtext COMMENT 'SQL statement to collect source data, required for full amount',
     `crontab`          varchar(56)           DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -558,8 +563,8 @@ CREATE TABLE `source_file_detail`
     `username`         varchar(32)  NOT NULL COMMENT 'User name of the data source IP host',
     `password`         varchar(64)  NOT NULL COMMENT 'The password corresponding to the above user name',
     `file_path`        varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -592,36 +597,36 @@ CREATE TABLE `storage_ext`
 -- Table structure for storage_hive
 -- ----------------------------
 DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
 CREATE TABLE `storage_hive`
 (
-    `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`     varchar(128) NOT NULL COMMENT 'Owning business group id',
-    `inlong_stream_id`    varchar(128) NOT NULL COMMENT 'Owning data stream id',
-    `jdbc_url`            varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
-    `username`            varchar(128) NOT NULL COMMENT 'Username',
-    `password`            varchar(255) NOT NULL COMMENT 'User password',
-    `db_name`             varchar(128) NOT NULL COMMENT 'Target database name',
-    `table_name`          varchar(128) NOT NULL COMMENT 'Target data table name',
-    `primary_partition`   varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
-    `secondary_partition` varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
-    `partition_type`      varchar(10)           DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
-    `file_format`         varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
-    `encoding_type`       varchar(255)          DEFAULT NULL COMMENT 'data encoding',
-    `field_splitter`      varchar(10)           DEFAULT NULL COMMENT 'field separator',
-    `hdfs_default_fs`     varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
-    `warehouse_dir`       varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
-    `usage_interval`      varchar(10)           DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
-    `storage_period`      int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
-    `previous_status`     int(11)               DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`           json                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
-    `opt_log`             varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `id`                          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`             varchar(128) NOT NULL COMMENT 'Owning business group id',
+    `inlong_stream_id`            varchar(128) NOT NULL COMMENT 'Owning data stream id',
+    `jdbc_url`                    varchar(255)          DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+    `username`                    varchar(128)          DEFAULT NULL COMMENT 'Username',
+    `password`                    varchar(255)          DEFAULT NULL COMMENT 'User password',
+    `db_name`                     varchar(128)          DEFAULT NULL COMMENT 'Target database name',
+    `table_name`                  varchar(128)          DEFAULT NULL COMMENT 'Target data table name',
+    `hdfs_default_fs`             varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+    `warehouse_dir`               varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+    `partition_interval`          int(5)                DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+    `partition_unit`              varchar(10)           DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+    `primary_partition`           varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
+    `secondary_partition`         varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
+    `partition_creation_strategy` varchar(50)           DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+    `file_format`                 varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+    `data_encoding`               varchar(20)           DEFAULT 'UTF-8' COMMENT 'data encoding type',
+    `data_separator`              varchar(10)           DEFAULT NULL COMMENT 'data field separator',
+    `storage_period`              int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
+    `opt_log`                     varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `status`                      int(4)                DEFAULT '0' COMMENT 'status',
+    `previous_status`             int(4)                DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`                  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`                     varchar(64)           DEFAULT NULL COMMENT 'creator name',
+    `modifier`                    varchar(64)           DEFAULT NULL COMMENT 'modifier name',
+    `create_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `modify_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+    `temp_view`                   json                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data is stored in Hive configuration table';
@@ -891,7 +896,7 @@ CREATE TABLE `cluster_set`
     `middleware_type` varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
     `in_charges`      varchar(512) COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512) COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'ClusterSet status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'ClusterSet status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)  NULL COMMENT 'Modifier name',
diff --git a/inlong-manager/doc/sql/apache_inlong_manager.sql b/inlong-manager/doc/sql/apache_inlong_manager.sql
index dec268e..8cae257 100644
--- a/inlong-manager/doc/sql/apache_inlong_manager.sql
+++ b/inlong-manager/doc/sql/apache_inlong_manager.sql
@@ -93,7 +93,7 @@ CREATE TABLE `business`
     `schema_name`     varchar(128)          DEFAULT NULL COMMENT 'Data type, associated data_schema table',
     `in_charges`      varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512)          DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'Business status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'Business status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -161,7 +161,7 @@ CREATE TABLE `cluster_info`
     `url`         varchar(256)          DEFAULT NULL COMMENT 'Cluster URL address',
     `is_backup`   tinyint(1)            DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
     `ext_props`   json                  DEFAULT NULL COMMENT 'extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -195,7 +195,7 @@ CREATE TABLE `common_db_server`
     `db_description`      varchar(256)          DEFAULT NULL COMMENT 'DB description',
     `backup_db_server_ip` varchar(64)           DEFAULT NULL COMMENT 'Backup DB HOST',
     `backup_db_port`      int(11)               DEFAULT NULL COMMENT 'Backup DB port',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
+    `status`              int(4)                DEFAULT '0' COMMENT 'status',
     `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -221,7 +221,7 @@ CREATE TABLE `common_file_server`
     `issue_type`     varchar(128)         DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
     `username`       varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
     `password`       varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
-    `status`         int(11)              DEFAULT '0' COMMENT 'status',
+    `status`         int(4)               DEFAULT '0' COMMENT 'status',
     `is_deleted`     tinyint(1)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`        varchar(64) NOT NULL COMMENT 'Creator name',
     `modifier`       varchar(64)          DEFAULT NULL COMMENT 'Modifier name',
@@ -248,12 +248,12 @@ CREATE TABLE `consumption`
     `topic`               varchar(255) NOT NULL COMMENT 'Consumption topic',
     `filter_enabled`      int(2)                DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
     `inlong_stream_id`    varchar(1024)         DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
-    `status`              int(11)      NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `status`              int(4)       NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'creator',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'modifier',
     `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `is_deleted`          int(2)                DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
@@ -264,8 +264,8 @@ CREATE TABLE `consumption`
 DROP TABLE IF EXISTS `consumption_pulsar`;
 CREATE TABLE `consumption_pulsar`
 (
-    `id`                  int          NOT NULL AUTO_INCREMENT,
-    `consumption_id`      int          DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+    `id`                  int(11)      NOT NULL AUTO_INCREMENT,
+    `consumption_id`      int(11)      DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
     `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
     `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
     `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group ID',
@@ -273,7 +273,7 @@ CREATE TABLE `consumption_pulsar`
     `retry_letter_topic`  varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
     `is_dlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
     `dead_letter_topic`   varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
-    `is_deleted`          int          DEFAULT '0' COMMENT 'Whether to delete',
+    `is_deleted`          tinyint(1)   DEFAULT '0' COMMENT 'Whether to delete',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
@@ -294,7 +294,7 @@ CREATE TABLE `data_proxy_cluster`
     `net_type`    varchar(20)           DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
     `in_charges`  varchar(512)          DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
     `ext_props`   json                  DEFAULT NULL COMMENT 'Extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'Cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'Cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -360,11 +360,16 @@ CREATE TABLE `data_stream`
     `storage_period`         int(11)           DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
     `data_type`              varchar(20)       DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
     `data_encoding`          varchar(8)        DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
-    `file_delimiter`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_separator`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_escape_char`       varchar(8)        DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
     `have_predefined_fields` tinyint(1)        DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+    `daily_records`          int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`          int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`           int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`             int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `in_charges`             varchar(512)      DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`                 int(11)           DEFAULT '0' COMMENT 'Data stream status',
-    `previous_status`        int(11)           DEFAULT '0' COMMENT 'Previous status',
+    `status`                 int(4)            DEFAULT '0' COMMENT 'Data stream status',
+    `previous_status`        int(4)            DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`             tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`                varchar(64)       DEFAULT NULL COMMENT 'Creator name',
     `modifier`               varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
@@ -438,7 +443,7 @@ CREATE TABLE `operation_log`
     `cost_time`           bigint(20)         DEFAULT NULL COMMENT 'time-consuming',
     `body`                text COMMENT 'Request body',
     `param`               text COMMENT 'parameter',
-    `status`              tinyint(1)         DEFAULT NULL COMMENT 'status',
+    `status`              int(4)             DEFAULT NULL COMMENT 'status',
     `request_time`        timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
     `err_msg`             text COMMENT 'Error message',
     PRIMARY KEY (`id`)
@@ -502,8 +507,8 @@ CREATE TABLE `source_db_detail`
     `table_fields`     longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
     `data_sql`         longtext COMMENT 'SQL statement to collect source data, required for full amount',
     `crontab`          varchar(56)           DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -558,8 +563,8 @@ CREATE TABLE `source_file_detail`
     `username`         varchar(32)  NOT NULL COMMENT 'User name of the data source IP host',
     `password`         varchar(64)  NOT NULL COMMENT 'The password corresponding to the above user name',
     `file_path`        varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -592,36 +597,36 @@ CREATE TABLE `storage_ext`
 -- Table structure for storage_hive
 -- ----------------------------
 DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
 CREATE TABLE `storage_hive`
 (
-    `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`     varchar(128) NOT NULL COMMENT 'Owning business group id',
-    `inlong_stream_id`    varchar(128) NOT NULL COMMENT 'Owning data stream id',
-    `jdbc_url`            varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
-    `username`            varchar(128) NOT NULL COMMENT 'Username',
-    `password`            varchar(255) NOT NULL COMMENT 'User password',
-    `db_name`             varchar(128) NOT NULL COMMENT 'Target database name',
-    `table_name`          varchar(128) NOT NULL COMMENT 'Target data table name',
-    `primary_partition`   varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
-    `secondary_partition` varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
-    `partition_type`      varchar(10)           DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
-    `file_format`         varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
-    `encoding_type`       varchar(255)          DEFAULT NULL COMMENT 'data encoding',
-    `field_splitter`      varchar(10)           DEFAULT NULL COMMENT 'field separator',
-    `hdfs_default_fs`     varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
-    `warehouse_dir`       varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
-    `usage_interval`      varchar(10)           DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
-    `storage_period`      int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
-    `previous_status`     int(11)               DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`           json                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
-    `opt_log`             varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `id`                          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`             varchar(128) NOT NULL COMMENT 'Owning business group id',
+    `inlong_stream_id`            varchar(128) NOT NULL COMMENT 'Owning data stream id',
+    `jdbc_url`                    varchar(255)          DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+    `username`                    varchar(128)          DEFAULT NULL COMMENT 'Username',
+    `password`                    varchar(255)          DEFAULT NULL COMMENT 'User password',
+    `db_name`                     varchar(128)          DEFAULT NULL COMMENT 'Target database name',
+    `table_name`                  varchar(128)          DEFAULT NULL COMMENT 'Target data table name',
+    `hdfs_default_fs`             varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+    `warehouse_dir`               varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+    `partition_interval`          int(5)                DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+    `partition_unit`              varchar(10)           DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+    `primary_partition`           varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
+    `secondary_partition`         varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
+    `partition_creation_strategy` varchar(50)           DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+    `file_format`                 varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+    `data_encoding`               varchar(20)           DEFAULT 'UTF-8' COMMENT 'data encoding type',
+    `data_separator`              varchar(10)           DEFAULT NULL COMMENT 'data field separator',
+    `storage_period`              int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
+    `opt_log`                     varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `status`                      int(4)                DEFAULT '0' COMMENT 'status',
+    `previous_status`             int(4)                DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`                  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`                     varchar(64)           DEFAULT NULL COMMENT 'creator name',
+    `modifier`                    varchar(64)           DEFAULT NULL COMMENT 'modifier name',
+    `create_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `modify_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+    `temp_view`                   json                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data is stored in Hive configuration table';
@@ -891,7 +896,7 @@ CREATE TABLE `cluster_set`
     `middleware_type` varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
     `in_charges`      varchar(512) COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512) COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'ClusterSet status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'ClusterSet status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)  NULL COMMENT 'Modifier name',
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index c2c322d..d2eef5e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -34,6 +34,14 @@ public class BizConstant {
 
     public static final String DATA_TYPE_KEY_VALUE = "KEY-VALUE";
 
+    public static final String FILE_FORMAT_TEXT = "TextFile";
+
+    public static final String FILE_FORMAT_ORC = "OrcFile";
+
+    public static final String FILE_FORMAT_SEQUENCE = "SequenceFile";
+
+    public static final String FILE_FORMAT_PARQUET = "Parquet";
+
     public static final String MIDDLEWARE_TUBE = "TUBE";
 
     public static final String MIDDLEWARE_PULSAR = "PULSAR";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
index eb2906b..b3d8643 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
@@ -55,7 +55,8 @@ public class BusinessInfo {
     @ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
     private String middlewareType;
 
-    @ApiModelProperty(value = "MQ resource object, in business, Tube corresponds to Topic")
+    @ApiModelProperty(value = "MQ resource object, in business",
+            notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
     private String mqResourceObj;
 
     @ApiModelProperty(value = "Tube master URL")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java
index 6f365e6..29fc201 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java
@@ -45,7 +45,7 @@ public class BusinessMqExtBase {
     @ApiModelProperty(value = "is deleted? 0: deleted, 1: not deleted")
     private Integer isDeleted = 0;
 
-    @ApiModelProperty(value = "Middleware type of data storage, high throughput: TUBE")
+    @ApiModelProperty(value = "Middleware type of data storage, high throughput: TUBE, high consistency : PULSAR")
     private String middlewareType;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java
index 8b15b3e..0851505 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java
@@ -49,18 +49,18 @@ public class BusinessPulsarInfo extends BusinessMqExtBase {
     private Integer retentionTime = 72;
 
     @ApiModelProperty(value = "The unit of the message storage time")
-    private String retentionTimeUnit;
+    private String retentionTimeUnit = "hours";
 
     @ApiModelProperty(value = "Message time-to-live duration")
     private Integer ttl = 24;
 
     @ApiModelProperty(value = "The unit of message's time-to-live duration")
-    private String ttlUnit;
+    private String ttlUnit = "hours";
 
     @ApiModelProperty(value = "Message size")
     private Integer retentionSize = -1;
 
     @ApiModelProperty(value = "The unit of message size")
-    private String retentionSizeUnit;
+    private String retentionSizeUnit = "MB";
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
index cebdcfb..d20d4c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
@@ -44,15 +44,15 @@ public class ConsumptionInfo {
     private Integer id;
 
     @ApiModelProperty(value = "consumer group id")
-    @NotBlank(message = "consumerGroupId can't be null")
+    @NotBlank(message = "consumerGroupId cannot be null")
     private String consumerGroupId;
 
     @ApiModelProperty(value = "consumer group name: only support [a-zA-Z0-9_]")
-    @NotBlank(message = "consumerGroupName can't be null")
+    @NotBlank(message = "consumerGroupName cannot be null")
     private String consumerGroupName;
 
     @ApiModelProperty(value = "consumption in charge")
-    @NotNull(message = "inCharges can't be null")
+    @NotNull(message = "inCharges cannot be null")
     private String inCharges;
 
     @ApiModelProperty(value = "consumption target business group id")
@@ -60,7 +60,7 @@ public class ConsumptionInfo {
     private String inlongGroupId;
 
     @ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
-    @NotBlank(message = "middlewareType can't be null")
+    // @NotBlank(message = "middlewareType cannot be null")
     private String middlewareType;
 
     @ApiModelProperty(value = "consumption target topic")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
index 589a262..0c7b3c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
@@ -39,46 +39,52 @@ public class StorageHiveInfo extends BaseStorageInfo {
     @ApiModelProperty("Hive JDBC URL")
     private String jdbcUrl;
 
-    @ApiModelProperty("username for JDBC URL")
+    @ApiModelProperty("Username for JDBC URL")
     private String username;
 
-    @ApiModelProperty("user password")
+    @ApiModelProperty("User password")
     private String password;
 
-    @ApiModelProperty("target database name")
+    @ApiModelProperty("Target database name")
     private String dbName;
 
-    @ApiModelProperty("target table name")
+    @ApiModelProperty("Target table name")
     private String tableName;
 
-    @ApiModelProperty("primary partition field")
+    @ApiModelProperty("HDFS defaultFS")
+    private String hdfsDefaultFs;
+
+    @ApiModelProperty("Warehouse directory")
+    private String warehouseDir;
+
+    @ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
+    private Integer partitionInterval;
+
+    @ApiModelProperty("Partition type, support: D-day, H-hour, I-minute")
+    private String partitionUnit;
+
+    @ApiModelProperty("Primary partition field")
     private String primaryPartition;
 
-    @ApiModelProperty("secondary partition field")
+    @ApiModelProperty("Secondary partition field")
     private String secondaryPartition;
 
-    @ApiModelProperty("partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation")
-    private String partitionType;
+    @ApiModelProperty("Partition creation strategy, partition start, partition close")
+    private String partitionCreationStrategy;
 
-    @ApiModelProperty("file format, support: TextFile, RCFile, SequenceFile, Avro")
+    @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
     private String fileFormat;
 
-    @ApiModelProperty("field splitter")
-    private String fieldSplitter;
-
-    @ApiModelProperty("data encoding type")
-    private String encodingType;
-
-    @ApiModelProperty("HDFS defaultFS")
-    private String hdfsDefaultFs;
+    @ApiModelProperty("Data encoding type")
+    private String dataEncoding;
 
-    @ApiModelProperty("warehouse directory")
-    private String warehouseDir;
+    @ApiModelProperty("Data field separator")
+    private String dataSeparator;
 
-    @ApiModelProperty("interval at which Sort collects data to Hive is 10M, 15M, 30M, 1H, 1D")
-    private String usageInterval;
+    @ApiModelProperty("Data storage period in Hive, unit: Day")
+    private Integer storagePeriod;
 
-    @ApiModelProperty("backend operation log")
+    @ApiModelProperty("Backend operation log")
     private String optLog;
 
     @ApiModelProperty("hive table field list")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfoToHiveConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
similarity index 56%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfoToHiveConfig.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
index a35ab0b..bf52872 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfoToHiveConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
@@ -15,91 +15,49 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.datastream;
+package org.apache.inlong.manager.common.pojo.datastorage;
 
-import java.util.List;
 import lombok.Data;
 
 /**
- * Data stream info for Hive config
+ * Hive info for Sort config
  */
 @Data
-public class DataStreamInfoToHiveConfig {
+public class StorageHiveSortInfo {
 
     private Integer id;
-
-    private String jdbcUrl;
-
-    private int status;
-
-    private String inlongStreamId;
-
-    private String description;
-
     private String inlongGroupId;
+    private String inlongStreamId;
 
-    private String defaultSelectors;
-
-    private String partitionType;
-
-    private String partitionFields;
-
-    private String partitionFieldPosition;
-
-    private Integer clusterId;
-
-    private String dataType;
-
-    private String fieldSplitter;
-
-    private String clusterTag;
-
-    private String encodingType;
-
-    private String hiveAddr;
-
-    private String creator;
-
-    private String userName;
-
+    // Hive server info
+    private String jdbcUrl;
+    private String username;
     private String password;
 
-    private String compression;
-
-    private String warehouseDir;
-
-    private boolean setHadoopUgi;
-
-    private String hadoopUgi;
-
-    private Integer storagePeriod;
-
-    private String fsDefaultName;
-
+    // Hive db and table info
     private String dbName;
-
     private String tableName;
+    private String hdfsDefaultFs;
+    private String warehouseDir;
 
+    private Integer partitionInterval;
+    private String partitionUnit;
     private String primaryPartition;
-
     private String secondaryPartition;
+    private String partitionCreationStrategy;
 
     private String fileFormat;
+    private String dataEncoding;
+    private String targetSeparator; // Target separator configured in the storage info
+    private Integer status;
+    private String creator;
 
-    private String p;
-
-    private String mt;
-
-    private String usTaskId;
-
-    private String qualifiedThreshold;
-
-    private Integer hiveType;
-
-    private String usageInterval;
-
-    private String partitionStrategy;
-
-    private List<DataStreamExtInfo> extList;
+    // Data stream info
+    private String mqResourceObj;
+    private String dataSourceType;
+    private String dataType;
+    private String description;
+    private String sourceSeparator; // Target separator configured in the stream info
+    private String dataEscapeChar;
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java
index f966fa4..083eb83 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java
@@ -40,6 +40,9 @@ public class StoragePageRequest extends PageRequest {
     @ApiModelProperty(value = "Storage type, such as HIVE", required = true)
     private String storageType;
 
+    @ApiModelProperty(value = "Key word")
+    private String keyWord;
+
     @ApiModelProperty(value = "Status")
     private Integer status;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java
index 178eaf2..9e60f4d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java
@@ -42,6 +42,10 @@ public class DataStreamInfo extends DataStreamBaseInfo {
     @ApiModelProperty(value = "Data stream description")
     private String description;
 
+    @ApiModelProperty(value = "MQ resource object, in business",
+            notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
+    private String mqResourceObj;
+
     @ApiModelProperty(value = "Data source type, including: FILE, DB, AUTO_PUSH (DATA_PROXY_SDK, HTTP)")
     private String dataSourceType;
 
@@ -54,12 +58,27 @@ public class DataStreamInfo extends DataStreamBaseInfo {
     @ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
     private String dataEncoding;
 
-    @ApiModelProperty(value = "Field delimiter, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
-    private String fileDelimiter;
+    @ApiModelProperty(value = "Data separator, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
+    private String dataSeparator;
+
+    @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+    private String dataEscapeChar;
 
     @ApiModelProperty(value = "(File and DB access) Whether there are predefined fields, 0: no, 1: yes")
     private Integer havePredefinedFields;
 
+    @ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
+    private Integer dailyRecords;
+
+    @ApiModelProperty(value = "Access size per day, unit: GB per day")
+    private Integer dailyStorage;
+
+    @ApiModelProperty(value = "peak access per second, unit: bars per second")
+    private Integer peakRecords;
+
+    @ApiModelProperty(value = "The maximum length of a single piece of data, unit: Byte")
+    private Integer maxLength;
+
     @ApiModelProperty(value = "Names of responsible persons, separated by commas")
     private String inCharges;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java
index 8b08968..cd20094 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java
@@ -52,4 +52,7 @@ public class DataStreamPageRequest extends PageRequest {
     @ApiModelProperty(value = "Current user", hidden = true)
     private String currentUser;
 
+    @ApiModelProperty(value = "Business in charges", hidden = true)
+    private String inCharges;
+
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
index 15cd115..c825f9a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
@@ -35,14 +35,15 @@ public class DataStreamEntity implements Serializable {
     private Integer storagePeriod;
     private String dataType;
     private String dataEncoding;
-    private String fileDelimiter;
+    private String dataSeparator;
+    private String dataEscapeChar;
     private Integer havePredefinedFields;
-    private String inCharges;
 
     private Integer dailyRecords;
     private Integer dailyStorage;
     private Integer peakRecords;
     private Integer maxLength;
+    private String inCharges;
 
     private Integer status;
     private Integer previousStatus;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
index 16f05fd..c6e513e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
@@ -33,16 +33,21 @@ public class StorageHiveEntity implements Serializable {
     private String password;
     private String dbName;
     private String tableName;
+    private String hdfsDefaultFs;
+    private String warehouseDir;
+
+    private Integer partitionInterval;
+    private String partitionUnit;
     private String primaryPartition;
     private String secondaryPartition;
-    private String partitionType;
+    private String partitionCreationStrategy;
+
     private String fileFormat;
-    private String fieldSplitter;
-    private String encodingType;
-    private String hdfsDefaultFs;
-    private String warehouseDir;
-    private String usageInterval;
+    private String dataEncoding;
+    private String dataSeparator;
     private Integer storagePeriod;
+    private String optLog;
+
     private Integer status;
     private Integer previousStatus;
     private Integer isDeleted;
@@ -50,7 +55,6 @@ public class StorageHiveEntity implements Serializable {
     private String modifier;
     private Date createTime;
     private Date modifyTime;
-    private String optLog;
     private String tempView;
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
index c1c988d..41c97a7 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.dao.mapper;
 
 import java.util.List;
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
 import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -36,8 +35,6 @@ public interface DataStreamEntityMapper {
 
     DataStreamEntity selectByPrimaryKey(Integer id);
 
-    int updateByPrimaryKeySelective(DataStreamEntity record);
-
     int updateByPrimaryKey(DataStreamEntity record);
 
     DataStreamEntity selectByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
@@ -50,25 +47,10 @@ public interface DataStreamEntityMapper {
      * @param request query request
      * @return data stream list
      */
-    List<DataStreamEntity> selectByCondition(DataStreamPageRequest request);
+    List<DataStreamEntity> selectByCondition(@Param("request") DataStreamPageRequest request);
 
     List<DataStreamEntity> selectByGroupId(@Param("groupId") String groupId);
 
-    /**
-     * According to the conditions and business in charges, query all data stream
-     *
-     * @param request paging query conditions
-     * @param inCharges business in charges
-     * @return data stream list
-     */
-    List<DataStreamEntity> selectByConditionAndInCharges(@Param("request") DataStreamPageRequest request,
-            @Param("inCharges") String inCharges);
-
-    List<DataStreamInfoToHiveConfig> selectStreamToHiveInfo(@Param("groupId") String groupId);
-
-    DataStreamInfoToHiveConfig selectStreamToHiveInfoByIdentifier(@Param("groupId") String groupId,
-            @Param("streamId") String streamId);
-
     int selectCountByGroupId(@Param("groupId") String groupId);
 
     List<DataStreamTopicVO> selectTopicList(@Param("groupId") String groupId);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
index 74a14e1..fea4cd5 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.dao.mapper;
 
 import java.util.List;
 import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
 import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
 import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
@@ -45,7 +46,7 @@ public interface StorageHiveEntityMapper {
      * @param request Paging query conditions
      * @return Hive storage entity list
      */
-    List<StorageHiveEntity> selectByCondition(StoragePageRequest request);
+    List<StorageHiveEntity> selectByCondition(@Param("request") StoragePageRequest request);
 
     /**
      * According to the business group id and data stream id, query valid storage information
@@ -80,7 +81,16 @@ public interface StorageHiveEntityMapper {
     /**
      * According to the business group id and data stream id, query Hive storage summary information
      */
-    List<StorageSummaryInfo> selectSummaryByIdentifier(@Param("groupId") String groupId,
+    List<StorageSummaryInfo> selectSummary(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+    /**
+     * Select Hive configs for Sort under the business group id and stream id
+     *
+     * @param groupId Business group id
+     * @param streamId Data stream id, if is null, get all configs under the group id
+     * @return Hive Sort config
+     */
+    List<StorageHiveSortInfo> selectHiveSortInfoByIdentifier(@Param("groupId") String groupId,
             @Param("streamId") String streamId);
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
index 16b9d7e..5e55122 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
@@ -18,8 +18,7 @@
     under the License.
 -->
 
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
-        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
 <mapper namespace="org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper">
     <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.DataStreamEntity">
         <id column="id" jdbcType="INTEGER" property="id"/>
@@ -32,8 +31,13 @@
         <result column="storage_period" jdbcType="INTEGER" property="storagePeriod"/>
         <result column="data_type" jdbcType="VARCHAR" property="dataType"/>
         <result column="data_encoding" jdbcType="VARCHAR" property="dataEncoding"/>
-        <result column="file_delimiter" jdbcType="VARCHAR" property="fileDelimiter"/>
+        <result column="data_separator" jdbcType="VARCHAR" property="dataSeparator"/>
+        <result column="data_escape_char" jdbcType="VARCHAR" property="dataEscapeChar"/>
         <result column="have_predefined_fields" jdbcType="INTEGER" property="havePredefinedFields"/>
+        <result column="daily_records" jdbcType="INTEGER" property="dailyRecords"/>
+        <result column="daily_storage" jdbcType="INTEGER" property="dailyStorage"/>
+        <result column="peak_records" jdbcType="INTEGER" property="peakRecords"/>
+        <result column="max_length" jdbcType="INTEGER" property="maxLength"/>
         <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
         <result column="status" jdbcType="INTEGER" property="status"/>
         <result column="previous_status" jdbcType="INTEGER" property="previousStatus"/>
@@ -44,47 +48,11 @@
         <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
         <result column="temp_view" jdbcType="LONGVARCHAR" property="tempView"/>
     </resultMap>
-
-    <resultMap id="dataStreamFullInfo"
-            type="org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig">
-        <result column="id" jdbcType="VARCHAR" property="id"/>
-        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
-        <result column="name" jdbcType="VARCHAR" property="inlongGroupId"/>
-        <result column="status" jdbcType="VARCHAR" property="status"/>
-        <result column="data_type" jdbcType="VARCHAR" property="dataType"/>
-        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
-        <result column="field_splitter" jdbcType="VARCHAR" property="fieldSplitter"/>
-        <result column="creator" jdbcType="VARCHAR" property="creator"/>
-        <result column="db_name" jdbcType="VARCHAR" property="dbName"/>
-        <result column="table_name" jdbcType="VARCHAR" property="tableName"/>
-        <result column="partition_type" jdbcType="VARCHAR" property="partitionType"/>
-        <result column="partition_field_position" jdbcType="VARCHAR"
-                property="partitionFieldPosition"/>
-        <result column="primary_partition" jdbcType="BIT" property="primaryPartition"/>
-        <result column="secondary_partition" jdbcType="BIT" property="secondaryPartition"/>
-        <result column="file_format" jdbcType="BIT" property="fileFormat"/>
-        <result column="clusterid" jdbcType="BIT" property="clusterId"/>
-        <result column="storage_period" jdbcType="VARCHAR" property="storagePeriod"/>
-        <result column="cluster_tag" jdbcType="VARCHAR" property="clusterTag"/>
-        <result column="username" jdbcType="TIMESTAMP" property="userName"/>
-        <result column="PASSWORD" jdbcType="TIMESTAMP" property="password"/>
-        <result column="warehouse_dir" jdbcType="TIMESTAMP" property="warehouseDir"/>
-        <result column="hdfs_defaultfs" jdbcType="TIMESTAMP" property="fsDefaultName"/>
-        <result column="hdfs_ugi" jdbcType="TIMESTAMP" property="hadoopUgi"/>
-        <result column="encoding_type" jdbcType="VARCHAR" property="encodingType"/>
-        <result column="us_task_id" jdbcType="VARCHAR" property="usTaskId"/>
-        <result column="usage_interval" jdbcType="VARCHAR" property="usageInterval"/>
-    </resultMap>
-
-    <resultMap id="streamSummaryMap" type="java.util.Map">
-        <result column="dataSourceType" property="dataSourceType" jdbcType="VARCHAR"/>
-        <result column="dataStorageType" property="dataStorageType" jdbcType="VARCHAR"/>
-    </resultMap>
-
     <sql id="Base_Column_List">
         id, inlong_stream_id, inlong_group_id, name, description, mq_resource_obj, data_source_type,
-        storage_period, data_type, data_encoding, file_delimiter, have_predefined_fields, in_charges,
-        status, previous_status, is_deleted, creator, modifier, create_time, modify_time, temp_view
+        storage_period, data_type, data_encoding, data_separator, data_escape_char, have_predefined_fields,
+        daily_records, daily_storage, peak_records, max_length, in_charges, status, previous_status,
+        is_deleted, creator, modifier, create_time, modify_time, temp_view
     </sql>
 
     <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -117,27 +85,38 @@
     <select id="selectByCondition" resultMap="BaseResultMap"
             parameterType="org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest">
         select
-        <include refid="Base_Column_List"/>
-        from data_stream
+            distinct ds.id, ds.inlong_group_id, ds.inlong_stream_id, ds.name,
+            ds.description, ds.mq_resource_obj, ds.data_source_type, ds.storage_period, ds.data_type,
+            ds.data_encoding, ds.data_separator, ds.data_escape_char, ds.have_predefined_fields,
+            ds.daily_records, ds.daily_storage, ds.peak_records, ds.max_length, ds.in_charges,
+            ds.status, ds.creator, ds.modifier, ds.create_time, ds.modify_time
+        from data_stream ds, business biz, wf_approver approver
         <where>
-            is_deleted = 0
-            and (creator = #{currentUser,jdbcType=VARCHAR} or
-            find_in_set(#{currentUser,jdbcType=VARCHAR}, in_charges))
-            <if test="inlongGroupId != null and inlongGroupId != ''">
-                and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+            ds.is_deleted = 0
+            and approver.process_name = "NEW_BUSINESS_WORKFLOW" and approver.is_deleted = 0
+            and ds.inlong_group_id = biz.inlong_group_id and biz.is_deleted = 0
+            <if test="request.inlongGroupId != null and request.inlongGroupId != ''">
+                and ds.inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
+            </if>
+            and (
+            find_in_set(#{request.currentUser, jdbcType=VARCHAR}, biz.in_charges)
+            or find_in_set(#{request.currentUser, jdbcType=VARCHAR}, biz.followers)
+            or find_in_set(#{request.currentUser, jdbcType=VARCHAR}, approver.approvers)
+            )
+            <if test="request.dataSourceType != null and request.dataSourceType != ''">
+                and ds.data_source_type = #{request.dataSourceType, jdbcType=VARCHAR}
             </if>
-            <if test="dataSourceType != null and dataSourceType != ''">
-                and data_source_type = #{dataSourceType, jdbcType=VARCHAR}
-            </if>
-            <if test="keyWord != null and keyWord != ''">
-                and (name like CONCAT('%', #{keyWord}, '%') or description like CONCAT('%',
-                #{keyWord}, '%'))
+            <if test="request.keyWord != null and request.keyWord != ''">
+                and (ds.inlong_stream_id like CONCAT('%', #{request.keyWord}, '%')
+                or ds.name like CONCAT('%', #{request.keyWord}, '%')
+                or ds.description like CONCAT('%', #{request.keyWord}, '%')
+                )
             </if>
-            <if test="status != null and status != ''">
-                and status = #{status, jdbcType=INTEGER}
+            <if test="request.status != null and request.status != ''">
+                and ds.status = #{request.status, jdbcType=INTEGER}
             </if>
         </where>
-        order by modify_time desc
+        order by ds.modify_time desc
     </select>
     <select id="selectByGroupId" resultType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
         select
@@ -146,23 +125,6 @@
         where inlong_group_id = #{groupId, jdbcType=VARCHAR}
         and is_deleted = 0
     </select>
-    <select id="selectByConditionAndInCharges" resultType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from data_stream
-        <where>
-            inlong_group_id = #{request.inlongGroupId,jdbcType=VARCHAR}
-            and is_deleted = 0
-            and (creator = #{request.currentUser,jdbcType=VARCHAR}
-            or find_in_set(#{request.currentUser,jdbcType=VARCHAR}, #{inCharges,jdbcType=VARCHAR}))
-            <if test="request.keyWord != null and request.keyWord != ''">
-                and (name like CONCAT('%', #{request.keyWord}, '%') or description like CONCAT('%',
-                #{request.keyWord},
-                '%'))
-            </if>
-        </where>
-        order by create_time desc
-    </select>
     <select id="selectCountByGroupId" resultType="java.lang.Integer">
         select count(1)
         from data_stream
@@ -197,16 +159,20 @@
         insert into data_stream (id, inlong_stream_id, inlong_group_id,
                                  name, description, mq_resource_obj,
                                  data_source_type, storage_period, data_type,
-                                 data_encoding, file_delimiter,
-                                 have_predefined_fields,
+                                 data_encoding, data_separator,
+                                 data_escape_char, have_predefined_fields,
+                                 daily_records, daily_storage,
+                                 peak_records, max_length,
                                  in_charges, status, previous_status,
                                  is_deleted, creator, modifier,
                                  create_time, modify_time, temp_view)
         values (#{id,jdbcType=INTEGER}, #{inlongStreamId,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR},
                 #{name,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR}, #{mqResourceObj,jdbcType=VARCHAR},
                 #{dataSourceType,jdbcType=VARCHAR}, #{storagePeriod,jdbcType=INTEGER}, #{dataType,jdbcType=VARCHAR},
-                #{dataEncoding,jdbcType=VARCHAR}, #{fileDelimiter,jdbcType=VARCHAR},
-                #{havePredefinedFields,jdbcType=INTEGER},
+                #{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
+                #{dataEscapeChar,jdbcType=VARCHAR}, #{havePredefinedFields,jdbcType=INTEGER},
+                #{dailyRecords,jdbcType=INTEGER}, #{dailyStorage,jdbcType=INTEGER},
+                #{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER},
                 #{inCharges,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
                 #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
                 #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP}, #{tempView,jdbcType=LONGVARCHAR})
@@ -245,12 +211,27 @@
             <if test="dataEncoding != null">
                 data_encoding,
             </if>
-            <if test="fileDelimiter != null">
-                file_delimiter,
+            <if test="dataSeparator != null">
+                data_separator,
+            </if>
+            <if test="dataEscapeChar != null">
+                data_escape_char,
             </if>
             <if test="havePredefinedFields != null">
                 have_predefined_fields,
             </if>
+            <if test="dailyRecords != null">
+                daily_records,
+            </if>
+            <if test="dailyStorage != null">
+                daily_storage,
+            </if>
+            <if test="peakRecords != null">
+                peak_records,
+            </if>
+            <if test="maxLength != null">
+                max_length,
+            </if>
             <if test="inCharges != null">
                 in_charges,
             </if>
@@ -310,12 +291,27 @@
             <if test="dataEncoding != null">
                 #{dataEncoding,jdbcType=VARCHAR},
             </if>
-            <if test="fileDelimiter != null">
-                #{fileDelimiter,jdbcType=VARCHAR},
+            <if test="dataSeparator != null">
+                #{dataSeparator,jdbcType=VARCHAR},
+            </if>
+            <if test="dataEscapeChar != null">
+                #{dataEscapeChar,jdbcType=VARCHAR},
             </if>
             <if test="havePredefinedFields != null">
                 #{havePredefinedFields,jdbcType=INTEGER},
             </if>
+            <if test="dailyRecords != null">
+                #{dailyRecords,jdbcType=INTEGER},
+            </if>
+            <if test="dailyStorage != null">
+                #{dailyStorage,jdbcType=INTEGER},
+            </if>
+            <if test="peakRecords != null">
+                #{peakRecords,jdbcType=INTEGER},
+            </if>
+            <if test="maxLength != null">
+                #{maxLength,jdbcType=INTEGER},
+            </if>
             <if test="inCharges != null">
                 #{inCharges,jdbcType=VARCHAR},
             </if>
@@ -346,72 +342,6 @@
         </trim>
     </insert>
 
-    <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
-        update data_stream
-        <set>
-            <if test="inlongStreamId != null">
-                inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
-            </if>
-            <if test="inlongGroupId != null">
-                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-            </if>
-            <if test="name != null">
-                name = #{name,jdbcType=VARCHAR},
-            </if>
-            <if test="description != null">
-                description = #{description,jdbcType=VARCHAR},
-            </if>
-            <if test="mqResourceObj != null">
-                mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
-            </if>
-            <if test="dataSourceType != null">
-                data_source_type = #{dataSourceType,jdbcType=VARCHAR},
-            </if>
-            <if test="storagePeriod != null">
-                storage_period = #{storagePeriod,jdbcType=INTEGER},
-            </if>
-            <if test="dataType != null">
-                data_type = #{dataType,jdbcType=VARCHAR},
-            </if>
-            <if test="dataEncoding != null">
-                data_encoding = #{dataEncoding,jdbcType=VARCHAR},
-            </if>
-            <if test="fileDelimiter != null">
-                file_delimiter = #{fileDelimiter,jdbcType=VARCHAR},
-            </if>
-            <if test="havePredefinedFields != null">
-                have_predefined_fields = #{havePredefinedFields,jdbcType=INTEGER},
-            </if>
-            <if test="inCharges != null">
-                in_charges = #{inCharges,jdbcType=VARCHAR},
-            </if>
-            <if test="status != null">
-                status = #{status,jdbcType=INTEGER},
-            </if>
-            <if test="previousStatus != null">
-                previous_status = #{previousStatus,jdbcType=INTEGER},
-            </if>
-            <if test="isDeleted != null">
-                is_deleted = #{isDeleted,jdbcType=INTEGER},
-            </if>
-            <if test="creator != null">
-                creator = #{creator,jdbcType=VARCHAR},
-            </if>
-            <if test="modifier != null">
-                modifier = #{modifier,jdbcType=VARCHAR},
-            </if>
-            <if test="createTime != null">
-                create_time = #{createTime,jdbcType=TIMESTAMP},
-            </if>
-            <if test="modifyTime != null">
-                modify_time = #{modifyTime,jdbcType=TIMESTAMP},
-            </if>
-            <if test="tempView != null">
-                temp_view = #{tempView,jdbcType=LONGVARCHAR},
-            </if>
-        </set>
-        where id = #{id,jdbcType=INTEGER}
-    </update>
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
         update data_stream
         set inlong_stream_id       = #{inlongStreamId,jdbcType=VARCHAR},
@@ -423,8 +353,13 @@
             storage_period         = #{storagePeriod,jdbcType=INTEGER},
             data_type              = #{dataType,jdbcType=VARCHAR},
             data_encoding          = #{dataEncoding,jdbcType=VARCHAR},
-            file_delimiter         = #{fileDelimiter,jdbcType=VARCHAR},
+            data_separator         = #{dataSeparator,jdbcType=VARCHAR},
+            data_escape_char       = #{dataEscapeChar,jdbcType=VARCHAR},
             have_predefined_fields = #{havePredefinedFields,jdbcType=INTEGER},
+            daily_records          = #{dailyRecords,jdbcType=INTEGER},
+            daily_storage          = #{dailyStorage,jdbcType=INTEGER},
+            peak_records           = #{peakRecords,jdbcType=INTEGER},
+            max_length             = #{maxLength,jdbcType=INTEGER},
             in_charges             = #{inCharges,jdbcType=VARCHAR},
             status                 = #{status,jdbcType=INTEGER},
             previous_status        = #{previousStatus,jdbcType=INTEGER},
@@ -440,55 +375,76 @@
         update data_stream
         <set>
             <if test="inlongStreamId != null">
-                inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
+                inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
             </if>
             <if test="inlongGroupId != null">
-                inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
+                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
             </if>
             <if test="name != null">
-                name = #{name, jdbcType=VARCHAR},
+                name = #{name,jdbcType=VARCHAR},
             </if>
             <if test="description != null">
-                description = #{description, jdbcType=VARCHAR},
+                description = #{description,jdbcType=VARCHAR},
             </if>
             <if test="mqResourceObj != null">
-                mq_resource_obj = #{mqResourceObj, jdbcType=VARCHAR},
+                mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
             </if>
             <if test="dataSourceType != null">
-                data_source_type = #{dataSourceType, jdbcType=VARCHAR},
+                data_source_type = #{dataSourceType,jdbcType=VARCHAR},
             </if>
             <if test="storagePeriod != null">
-                storage_period = #{storagePeriod, jdbcType=INTEGER},
+                storage_period = #{storagePeriod,jdbcType=INTEGER},
             </if>
             <if test="dataType != null">
-                data_type = #{dataType, jdbcType=VARCHAR},
+                data_type = #{dataType,jdbcType=VARCHAR},
             </if>
             <if test="dataEncoding != null">
-                data_encoding = #{dataEncoding, jdbcType=VARCHAR},
+                data_encoding = #{dataEncoding,jdbcType=VARCHAR},
             </if>
-            <if test="fileDelimiter != null">
-                file_delimiter = #{fileDelimiter, jdbcType=VARCHAR},
+            <if test="dataSeparator != null">
+                data_separator = #{dataSeparator,jdbcType=VARCHAR},
+            </if>
+            <if test="dataEscapeChar != null">
+                data_escape_char = #{dataEscapeChar,jdbcType=VARCHAR},
             </if>
             <if test="havePredefinedFields != null">
-                have_predefined_fields = #{havePredefinedFields, jdbcType=INTEGER},
+                have_predefined_fields = #{havePredefinedFields,jdbcType=INTEGER},
+            </if>
+            <if test="dailyRecords!= null">
+                daily_records= #{dailyRecords,jdbcType=INTEGER},
+            </if>
+            <if test="dailyStorage!= null">
+                daily_storage= #{dailyStorage,jdbcType=INTEGER},
+            </if>
+            <if test="peakRecords != null">
+                peak_records= #{peakRecords,jdbcType=INTEGER},
+            </if>
+            <if test="maxLength != null">
+                max_length= #{maxLength,jdbcType=INTEGER}
             </if>
             <if test="inCharges != null">
-                in_charges = #{inCharges, jdbcType=VARCHAR},
+                in_charges = #{inCharges,jdbcType=VARCHAR},
             </if>
             <if test="status != null">
                 status = #{status,jdbcType=INTEGER},
             </if>
             <if test="previousStatus != null">
-                status = #{previousStatus, jdbcType=INTEGER},
+                previous_status = #{previousStatus,jdbcType=INTEGER},
             </if>
             <if test="isDeleted != null">
-                is_deleted = #{isDeleted, jdbcType=INTEGER},
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
             </if>
             <if test="creator != null">
-                creator = #{creator, jdbcType=VARCHAR},
+                creator = #{creator,jdbcType=VARCHAR},
             </if>
             <if test="modifier != null">
-                modifier = #{modifier, jdbcType=VARCHAR},
+                modifier = #{modifier,jdbcType=VARCHAR},
+            </if>
+            <if test="createTime != null">
+                create_time = #{createTime,jdbcType=TIMESTAMP},
+            </if>
+            <if test="modifyTime != null">
+                modify_time = #{modifyTime,jdbcType=TIMESTAMP},
             </if>
             <if test="tempView != null">
                 temp_view = #{tempView,jdbcType=LONGVARCHAR},
@@ -518,65 +474,4 @@
           and is_deleted = 0
     </update>
 
-    <select id="selectStreamToHiveInfo" resultMap="dataStreamFullInfo">
-        SELECT h.id,
-               h.jdbc_url,
-               h.password,
-               h.username,
-               h.status,
-               data_type,
-               usage_interval,
-               description,
-               s.inlong_stream_id,
-               s.inlong_group_id,
-               db_name,
-               field_splitter,
-               h.creator,
-               table_name,
-               partition_type,
-               primary_partition,
-               secondary_partition,
-               file_format,
-               h.storage_period,
-               h.encoding_type
-        FROM data_stream s,
-             storage_hive h
-        WHERE s.is_deleted = 0
-          and h.is_deleted = 0
-          and s.inlong_stream_id = h.inlong_stream_id
-          AND s.inlong_group_id = h.inlong_group_id
-          and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
-    </select>
-
-    <select id="selectStreamToHiveInfoByIdentifier" resultMap="dataStreamFullInfo">
-        SELECT h.id,
-               h.jdbc_url,
-               h.password,
-               h.username,
-               h.status,
-               data_type,
-               usage_interval,
-               description,
-               s.inlong_stream_id,
-               s.inlong_group_id,
-               db_name,
-               field_splitter,
-               h.creator,
-               table_name,
-               partition_type,
-               primary_partition,
-               secondary_partition,
-               file_format,
-               h.storage_period,
-               h.encoding_type
-        FROM data_stream s,
-             storage_hive h
-        WHERE s.is_deleted = 0
-          and h.is_deleted = 0
-          and s.inlong_stream_id = h.inlong_stream_id
-          AND s.inlong_group_id = h.inlong_group_id
-          and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
-          and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
-    </select>
-
 </mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
index ba958b9..52c3178 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
@@ -29,16 +29,21 @@
         <result column="password" jdbcType="VARCHAR" property="password"/>
         <result column="db_name" jdbcType="VARCHAR" property="dbName"/>
         <result column="table_name" jdbcType="VARCHAR" property="tableName"/>
+        <result column="hdfs_default_fs" jdbcType="VARCHAR" property="hdfsDefaultFs"/>
+        <result column="warehouse_dir" jdbcType="VARCHAR" property="warehouseDir"/>
+
+        <result column="partition_interval" jdbcType="INTEGER" property="partitionInterval"/>
+        <result column="partition_unit" jdbcType="VARCHAR" property="partitionUnit"/>
         <result column="primary_partition" jdbcType="VARCHAR" property="primaryPartition"/>
         <result column="secondary_partition" jdbcType="VARCHAR" property="secondaryPartition"/>
-        <result column="partition_type" jdbcType="VARCHAR" property="partitionType"/>
+        <result column="partition_creation_strategy" jdbcType="VARCHAR" property="partitionCreationStrategy"/>
+
         <result column="file_format" jdbcType="VARCHAR" property="fileFormat"/>
-        <result column="field_splitter" jdbcType="VARCHAR" property="fieldSplitter"/>
-        <result column="encoding_type" jdbcType="VARCHAR" property="encodingType"/>
-        <result column="hdfs_default_fs" jdbcType="VARCHAR" property="hdfsDefaultFs"/>
-        <result column="warehouse_dir" jdbcType="VARCHAR" property="warehouseDir"/>
-        <result column="usage_interval" jdbcType="VARCHAR" property="usageInterval"/>
+        <result column="data_encoding" jdbcType="VARCHAR" property="dataEncoding"/>
+        <result column="data_separator" jdbcType="VARCHAR" property="dataSeparator"/>
         <result column="storage_period" jdbcType="INTEGER" property="storagePeriod"/>
+        <result column="opt_log" jdbcType="VARCHAR" property="optLog"/>
+
         <result column="status" jdbcType="INTEGER" property="status"/>
         <result column="previous_status" jdbcType="INTEGER" property="previousStatus"/>
         <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
@@ -46,15 +51,14 @@
         <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
         <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
         <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
-        <result column="opt_log" jdbcType="VARCHAR" property="optLog"/>
         <result column="temp_view" jdbcType="LONGVARCHAR" property="tempView"/>
     </resultMap>
 
     <sql id="Base_Column_List">
         id, inlong_group_id, inlong_stream_id, jdbc_url, username, password, db_name, table_name,
-        primary_partition, secondary_partition, partition_type, file_format, field_splitter, encoding_type,
-        hdfs_default_fs, warehouse_dir, usage_interval, storage_period, status, previous_status,
-        is_deleted, creator, modifier, create_time, modify_time, opt_log, temp_view
+        hdfs_default_fs, warehouse_dir, partition_interval, partition_unit, primary_partition, secondary_partition,
+        partition_creation_strategy, file_format, data_encoding, data_separator, storage_period, opt_log,
+        status, previous_status, is_deleted, creator, modifier, create_time, modify_time, temp_view
     </sql>
 
     <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -70,14 +74,20 @@
         from storage_hive
         <where>
             is_deleted = 0
-            <if test="inlongGroupId != null and inlongGroupId != ''">
-                and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+            <if test="request.inlongGroupId != null and request.inlongGroupId != ''">
+                and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
             </if>
-            <if test="inlongStreamId != null and inlongStreamId != ''">
-                and inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
+            <if test="request.inlongStreamId != null and request.inlongStreamId != ''">
+                and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
             </if>
-            <if test="status != null and status != ''">
-                and status = #{status, jdbcType=INTEGER}
+            <if test="request.keyWord != null and request.keyWord != ''">
+                and (
+                inlong_group_id like CONCAT('%', #{request.keyWord}, '%')
+                or inlong_stream_id like CONCAT('%', #{request.keyWord}, '%')
+                )
+            </if>
+            <if test="request.status != null and request.status != ''">
+                and status = #{request.status, jdbcType=INTEGER}
             </if>
             order by modify_time desc
         </where>
@@ -121,8 +131,7 @@
             and is_deleted = 0
         </where>
     </select>
-    <select id="selectSummaryByIdentifier"
-            resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo">
+    <select id="selectSummary" resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo">
         select s.id,
                s.inlong_group_id,
                s.inlong_stream_id,
@@ -132,6 +141,51 @@
           and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
           and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
     </select>
+    <select id="selectHiveSortInfoByIdentifier"
+            resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo">
+        SELECT hive.id,
+               hive.inlong_group_id,
+               hive.inlong_stream_id,
+
+               hive.jdbc_url,
+               hive.username,
+               hive.password,
+               hive.db_name,
+               hive.table_name,
+               hive.hdfs_default_fs,
+               hive.warehouse_dir,
+
+               hive.partition_interval,
+               hive.partition_unit,
+               hive.primary_partition,
+               hive.secondary_partition,
+               hive.partition_creation_strategy,
+
+               hive.file_format,
+               hive.data_encoding,
+               hive.data_separator   as targetSeparator,
+               hive.status,
+               hive.creator,
+
+               stream.mq_resource_obj,
+               stream.data_source_type,
+               stream.data_type,
+               stream.description,
+               stream.data_separator as sourceSeparator,
+               stream.data_escape_char
+        FROM data_stream stream,
+             storage_hive hive
+        <where>
+            stream.is_deleted = 0
+            and hive.is_deleted = 0
+            and stream.inlong_group_id = hive.inlong_group_id
+            and stream.inlong_stream_id = hive.inlong_stream_id
+            and stream.inlong_group_id = #{groupId, jdbcType=VARCHAR}
+            <if test="streamId != null and streamId != ''">
+                and stream.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+            </if>
+        </where>
+    </select>
 
     <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
         delete
@@ -143,23 +197,26 @@
             parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
         insert into storage_hive (id, inlong_group_id, inlong_stream_id,
                                   jdbc_url, username, password,
-                                  db_name, table_name, primary_partition,
-                                  secondary_partition, partition_type, file_format,
-                                  field_splitter, encoding_type, hdfs_default_fs,
-                                  warehouse_dir, usage_interval, storage_period,
+                                  db_name, table_name, hdfs_default_fs,
+                                  warehouse_dir, partition_interval,
+                                  partition_unit, primary_partition,
+                                  secondary_partition, partition_creation_strategy,
+                                  file_format, data_encoding, data_separator,
+                                  storage_period, opt_log,
                                   status, previous_status, is_deleted,
                                   creator, modifier, create_time,
-                                  modify_time, opt_log, temp_view)
+                                  modify_time, temp_view)
         values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
                 #{jdbcUrl,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR},
-                #{dbName,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{primaryPartition,jdbcType=VARCHAR},
-                #{secondaryPartition,jdbcType=VARCHAR}, #{partitionType,jdbcType=VARCHAR},
-                #{fileFormat,jdbcType=VARCHAR},
-                #{fieldSplitter,jdbcType=VARCHAR}, #{encodingType,jdbcType=VARCHAR}, #{hdfsDefaultFs,jdbcType=VARCHAR},
-                #{warehouseDir,jdbcType=VARCHAR}, #{usageInterval,jdbcType=VARCHAR}, #{storagePeriod,jdbcType=INTEGER},
+                #{dbName,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{hdfsDefaultFs,jdbcType=VARCHAR},
+                #{warehouseDir,jdbcType=VARCHAR}, #{partitionInterval,jdbcType=INTEGER},
+                #{partitionUnit,jdbcType=VARCHAR}, #{primaryPartition,jdbcType=VARCHAR},
+                #{secondaryPartition,jdbcType=VARCHAR}, #{partitionCreationStrategy,jdbcType=VARCHAR},
+                #{fileFormat,jdbcType=VARCHAR}, #{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
+                #{storagePeriod,jdbcType=INTEGER}, #{optLog,jdbcType=VARCHAR},
                 #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
                 #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP},
-                #{modifyTime,jdbcType=TIMESTAMP}, #{optLog,jdbcType=VARCHAR}, #{tempView,jdbcType=LONGVARCHAR})
+                #{modifyTime,jdbcType=TIMESTAMP}, #{tempView,jdbcType=LONGVARCHAR})
     </insert>
     <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
@@ -189,36 +246,42 @@
             <if test="tableName != null">
                 table_name,
             </if>
+            <if test="hdfsDefaultFs != null">
+                hdfs_default_fs,
+            </if>
+            <if test="warehouseDir != null">
+                warehouse_dir,
+            </if>
+            <if test="partitionInterval != null">
+                partition_interval,
+            </if>
+            <if test="partitionUnit != null">
+                partition_unit,
+            </if>
             <if test="primaryPartition != null">
                 primary_partition,
             </if>
             <if test="secondaryPartition != null">
                 secondary_partition,
             </if>
-            <if test="partitionType != null">
-                partition_type,
+            <if test="partitionCreationStrategy != null">
+                partition_creation_strategy,
             </if>
             <if test="fileFormat != null">
                 file_format,
             </if>
-            <if test="fieldSplitter != null">
-                field_splitter,
+            <if test="dataEncoding != null">
+                data_encoding,
             </if>
-            <if test="encodingType != null">
-                encoding_type,
-            </if>
-            <if test="hdfsDefaultFs != null">
-                hdfs_default_fs,
-            </if>
-            <if test="warehouseDir != null">
-                warehouse_dir,
-            </if>
-            <if test="usageInterval != null">
-                usage_interval,
+            <if test="dataSeparator != null">
+                data_separator,
             </if>
             <if test="storagePeriod != null">
                 storage_period,
             </if>
+            <if test="optLog != null">
+                opt_log,
+            </if>
             <if test="status != null">
                 status,
             </if>
@@ -240,9 +303,6 @@
             <if test="modifyTime != null">
                 modify_time,
             </if>
-            <if test="optLog != null">
-                opt_log,
-            </if>
             <if test="tempView != null">
                 temp_view,
             </if>
@@ -272,36 +332,42 @@
             <if test="tableName != null">
                 #{tableName,jdbcType=VARCHAR},
             </if>
+            <if test="hdfsDefaultFs != null">
+                #{hdfsDefaultFs,jdbcType=VARCHAR},
+            </if>
+            <if test="warehouseDir != null">
+                #{warehouseDir,jdbcType=VARCHAR},
+            </if>
+            <if test="partitionInterval != null">
+                #{partitionInterval,jdbcType=INTEGER},
+            </if>
+            <if test="partitionUnit != null">
+                #{partitionUnit,jdbcType=VARCHAR},
+            </if>
             <if test="primaryPartition != null">
                 #{primaryPartition,jdbcType=VARCHAR},
             </if>
             <if test="secondaryPartition != null">
                 #{secondaryPartition,jdbcType=VARCHAR},
             </if>
-            <if test="partitionType != null">
-                #{partitionType,jdbcType=VARCHAR},
+            <if test="partitionCreationStrategy != null">
+                #{partitionCreationStrategy,jdbcType=VARCHAR},
             </if>
             <if test="fileFormat != null">
                 #{fileFormat,jdbcType=VARCHAR},
             </if>
-            <if test="fieldSplitter != null">
-                #{fieldSplitter,jdbcType=VARCHAR},
+            <if test="dataEncoding != null">
+                #{dataEncoding,jdbcType=VARCHAR},
             </if>
-            <if test="encodingType != null">
-                #{encodingType,jdbcType=VARCHAR},
-            </if>
-            <if test="hdfsDefaultFs != null">
-                #{hdfsDefaultFs,jdbcType=VARCHAR},
-            </if>
-            <if test="warehouseDir != null">
-                #{warehouseDir,jdbcType=VARCHAR},
-            </if>
-            <if test="usageInterval != null">
-                #{usageInterval,jdbcType=VARCHAR},
+            <if test="dataSeparator != null">
+                #{dataSeparator,jdbcType=VARCHAR},
             </if>
             <if test="storagePeriod != null">
                 #{storagePeriod,jdbcType=INTEGER},
             </if>
+            <if test="optLog != null">
+                #{optLog,jdbcType=VARCHAR},
+            </if>
             <if test="status != null">
                 #{status,jdbcType=INTEGER},
             </if>
@@ -323,9 +389,6 @@
             <if test="modifyTime != null">
                 #{modifyTime,jdbcType=TIMESTAMP},
             </if>
-            <if test="optLog != null">
-                #{optLog,jdbcType=VARCHAR},
-            </if>
             <if test="tempView != null">
                 #{tempView,jdbcType=LONGVARCHAR},
             </if>
@@ -356,36 +419,42 @@
             <if test="tableName != null">
                 table_name = #{tableName,jdbcType=VARCHAR},
             </if>
+            <if test="hdfsDefaultFs != null">
+                hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
+            </if>
+            <if test="warehouseDir != null">
+                warehouse_dir = #{warehouseDir,jdbcType=VARCHAR},
+            </if>
+            <if test="partitionInterval != null">
+                partition_interval = #{partitionInterval,jdbcType=INTEGER},
+            </if>
+            <if test="partitionUnit != null">
+                partition_unit = #{partitionUnit,jdbcType=VARCHAR},
+            </if>
             <if test="primaryPartition != null">
                 primary_partition = #{primaryPartition,jdbcType=VARCHAR},
             </if>
             <if test="secondaryPartition != null">
                 secondary_partition = #{secondaryPartition,jdbcType=VARCHAR},
             </if>
-            <if test="partitionType != null">
-                partition_type = #{partitionType,jdbcType=VARCHAR},
+            <if test="partitionCreationStrategy != null">
+                partition_creation_strategy = #{partitionCreationStrategy,jdbcType=VARCHAR},
             </if>
             <if test="fileFormat != null">
                 file_format = #{fileFormat,jdbcType=VARCHAR},
             </if>
-            <if test="fieldSplitter != null">
-                field_splitter = #{fieldSplitter,jdbcType=VARCHAR},
-            </if>
-            <if test="encodingType != null">
-                encoding_type = #{encodingType,jdbcType=VARCHAR},
-            </if>
-            <if test="hdfsDefaultFs != null">
-                hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
-            </if>
-            <if test="warehouseDir != null">
-                warehouse_dir = #{warehouseDir,jdbcType=VARCHAR},
+            <if test="dataEncoding != null">
+                data_encoding = #{dataEncoding,jdbcType=VARCHAR},
             </if>
-            <if test="usageInterval != null">
-                usage_interval = #{usageInterval,jdbcType=VARCHAR},
+            <if test="dataSeparator != null">
+                data_separator = #{dataSeparator,jdbcType=VARCHAR},
             </if>
             <if test="storagePeriod != null">
                 storage_period = #{storagePeriod,jdbcType=INTEGER},
             </if>
+            <if test="optLog != null">
+                opt_log = #{optLog,jdbcType=VARCHAR},
+            </if>
             <if test="status != null">
                 status = #{status,jdbcType=INTEGER},
             </if>
@@ -407,9 +476,6 @@
             <if test="modifyTime != null">
                 modify_time = #{modifyTime,jdbcType=TIMESTAMP},
             </if>
-            <if test="optLog != null">
-                opt_log = #{optLog,jdbcType=VARCHAR},
-            </if>
             <if test="tempView != null">
                 temp_view = #{tempView,jdbcType=LONGVARCHAR},
             </if>
@@ -418,32 +484,33 @@
     </update>
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
         update storage_hive
-        set inlong_group_id     = #{inlongGroupId,jdbcType=VARCHAR},
-            inlong_stream_id    = #{inlongStreamId,jdbcType=VARCHAR},
-            jdbc_url            = #{jdbcUrl,jdbcType=VARCHAR},
-            username            = #{username,jdbcType=VARCHAR},
-            password            = #{password,jdbcType=VARCHAR},
-            db_name             = #{dbName,jdbcType=VARCHAR},
-            table_name          = #{tableName,jdbcType=VARCHAR},
-            primary_partition   = #{primaryPartition,jdbcType=VARCHAR},
-            secondary_partition = #{secondaryPartition,jdbcType=VARCHAR},
-            partition_type      = #{partitionType,jdbcType=VARCHAR},
-            file_format         = #{fileFormat,jdbcType=VARCHAR},
-            field_splitter      = #{fieldSplitter,jdbcType=VARCHAR},
-            encoding_type       = #{encodingType,jdbcType=VARCHAR},
-            hdfs_default_fs     = #{hdfsDefaultFs,jdbcType=VARCHAR},
-            warehouse_dir       = #{warehouseDir,jdbcType=VARCHAR},
-            usage_interval      = #{usageInterval,jdbcType=VARCHAR},
-            storage_period      = #{storagePeriod,jdbcType=INTEGER},
-            status              = #{status,jdbcType=INTEGER},
-            previous_status     = #{previousStatus,jdbcType=INTEGER},
-            is_deleted          = #{isDeleted,jdbcType=INTEGER},
-            creator             = #{creator,jdbcType=VARCHAR},
-            modifier            = #{modifier,jdbcType=VARCHAR},
-            create_time         = #{createTime,jdbcType=TIMESTAMP},
-            modify_time         = #{modifyTime,jdbcType=TIMESTAMP},
-            opt_log             = #{optLog,jdbcType=VARCHAR},
-            temp_view           = #{tempView,jdbcType=LONGVARCHAR}
+        set inlong_group_id             = #{inlongGroupId,jdbcType=VARCHAR},
+            inlong_stream_id            = #{inlongStreamId,jdbcType=VARCHAR},
+            jdbc_url                    = #{jdbcUrl,jdbcType=VARCHAR},
+            username                    = #{username,jdbcType=VARCHAR},
+            password                    = #{password,jdbcType=VARCHAR},
+            db_name                     = #{dbName,jdbcType=VARCHAR},
+            table_name                  = #{tableName,jdbcType=VARCHAR},
+            hdfs_default_fs             = #{hdfsDefaultFs,jdbcType=VARCHAR},
+            warehouse_dir               = #{warehouseDir,jdbcType=VARCHAR},
+            partition_interval          = #{partitionInterval,jdbcType=INTEGER},
+            partition_unit              = #{partitionUnit,jdbcType=VARCHAR},
+            primary_partition           = #{primaryPartition,jdbcType=VARCHAR},
+            secondary_partition         = #{secondaryPartition,jdbcType=VARCHAR},
+            partition_creation_strategy = #{partitionCreationStrategy,jdbcType=VARCHAR},
+            file_format                 = #{fileFormat,jdbcType=VARCHAR},
+            data_encoding               = #{dataEncoding,jdbcType=VARCHAR},
+            data_separator              = #{dataSeparator,jdbcType=VARCHAR},
+            storage_period              = #{storagePeriod,jdbcType=INTEGER},
+            opt_log                     = #{optLog,jdbcType=VARCHAR},
+            status                      = #{status,jdbcType=INTEGER},
+            previous_status             = #{previousStatus,jdbcType=INTEGER},
+            is_deleted                  = #{isDeleted,jdbcType=INTEGER},
+            creator                     = #{creator,jdbcType=VARCHAR},
+            modifier                    = #{modifier,jdbcType=VARCHAR},
+            create_time                 = #{createTime,jdbcType=TIMESTAMP},
+            modify_time                 = #{modifyTime,jdbcType=TIMESTAMP},
+            temp_view                   = #{tempView,jdbcType=LONGVARCHAR}
         where id = #{id,jdbcType=INTEGER}
     </update>
     <update id="updateStorageStatusById" parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
diff --git a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
index 9b8e124..08b2320 100644
--- a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
@@ -85,7 +85,7 @@ CREATE TABLE `business`
     `schema_name`     varchar(128)          DEFAULT NULL COMMENT 'Data type, associated data_schema table',
     `in_charges`      varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512)          DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'Business status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'Business status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -150,7 +150,7 @@ CREATE TABLE `cluster_info`
     `url`         varchar(256)          DEFAULT NULL COMMENT 'Cluster URL address',
     `is_backup`   tinyint(1)            DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
     `ext_props`   text                  DEFAULT NULL COMMENT 'extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -183,7 +183,7 @@ CREATE TABLE `common_db_server`
     `db_description`      varchar(256)          DEFAULT NULL COMMENT 'DB description',
     `backup_db_server_ip` varchar(64)           DEFAULT NULL COMMENT 'Backup DB HOST',
     `backup_db_port`      int(11)               DEFAULT NULL COMMENT 'Backup DB port',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
+    `status`              int(4)                DEFAULT '0' COMMENT 'status',
     `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -208,7 +208,7 @@ CREATE TABLE `common_file_server`
     `issue_type`     varchar(128)         DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
     `username`       varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
     `password`       varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
-    `status`         int(11)              DEFAULT '0' COMMENT 'status',
+    `status`         int(4)               DEFAULT '0' COMMENT 'status',
     `is_deleted`     tinyint(1)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`        varchar(64) NOT NULL COMMENT 'Creator name',
     `modifier`       varchar(64)          DEFAULT NULL COMMENT 'Modifier name',
@@ -234,16 +234,35 @@ CREATE TABLE `consumption`
     `topic`               varchar(255) NOT NULL COMMENT 'Consumption topic',
     `filter_enabled`      int(2)                DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
     `inlong_stream_id`    varchar(1024)         DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
-    `status`              int(11)      NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `status`              int(4)       NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'creator',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'modifier',
     `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `is_deleted`          int(2)                DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     PRIMARY KEY (`id`)
 );
 
 -- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+    `id`                  int(11)      NOT NULL AUTO_INCREMENT,
+    `consumption_id`      int(11)      DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+    `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
+    `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+    `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group ID',
+    `is_rlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+    `retry_letter_topic`  varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+    `is_dlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+    `dead_letter_topic`   varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+    `is_deleted`          tinyint(1)   DEFAULT '0' COMMENT 'Whether to delete',
+    PRIMARY KEY (`id`)
+) COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
 -- Table structure for data_proxy_cluster
 -- ----------------------------
 DROP TABLE IF EXISTS `data_proxy_cluster`;
@@ -259,7 +278,7 @@ CREATE TABLE `data_proxy_cluster`
     `net_type`    varchar(20)           DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
     `in_charges`  varchar(512)          DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
     `ext_props`   text                  DEFAULT NULL COMMENT 'Extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'Cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'Cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -302,7 +321,8 @@ CREATE TABLE `data_source_cmd_config`
     `modify_time`         timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Last update time ',
     `create_time`         timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `result_info`         varchar(64)          DEFAULT NULL,
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_1` (`task_id`, `bSend`, `specified_data_time`)
 );
 
 -- ----------------------------
@@ -321,18 +341,24 @@ CREATE TABLE `data_stream`
     `storage_period`         int(11)           DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
     `data_type`              varchar(20)       DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
     `data_encoding`          varchar(8)        DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
-    `file_delimiter`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_separator`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_escape_char`       varchar(8)        DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
     `have_predefined_fields` tinyint(1)        DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+    `daily_records`          int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`          int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`           int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`             int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `in_charges`             varchar(512)      DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`                 int(11)           DEFAULT '0' COMMENT 'Data stream status',
-    `previous_status`        int(11)           DEFAULT '0' COMMENT 'Previous status',
+    `status`                 int(4)            DEFAULT '0' COMMENT 'Data stream status',
+    `previous_status`        int(4)            DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`             tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`                varchar(64)       DEFAULT NULL COMMENT 'Creator name',
     `modifier`               varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
     `create_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `temp_view`              text              DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_data_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`, `modify_time`)
 );
 
 -- ----------------------------
@@ -348,7 +374,8 @@ CREATE TABLE `data_stream_ext`
     `key_value`        varchar(256)          DEFAULT NULL COMMENT 'The value of the configuration item',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_stream_id` (`inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -372,7 +399,8 @@ CREATE TABLE `data_stream_field`
     `bon_field_path`      varchar(256) DEFAULT NULL COMMENT 'BON field path',
     `bon_field_type`      varchar(64)  DEFAULT NULL COMMENT 'BON field type',
     `encrypt_level`       varchar(20)  DEFAULT NULL COMMENT 'Encryption level',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_stream_id` (`inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -393,7 +421,7 @@ CREATE TABLE `operation_log`
     `cost_time`           bigint(20)         DEFAULT NULL COMMENT 'time-consuming',
     `body`                text COMMENT 'Request body',
     `param`               text COMMENT 'parameter',
-    `status`              tinyint(1)         DEFAULT NULL COMMENT 'status',
+    `status`              int(4)             DEFAULT NULL COMMENT 'status',
     `request_time`        timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
     `err_msg`             text COMMENT 'Error message',
     PRIMARY KEY (`id`)
@@ -413,7 +441,9 @@ CREATE TABLE `role`
     `create_by`   varchar(255) NOT NULL,
     `update_by`   varchar(255) NOT NULL,
     `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `role_role_code_uindex` (`role_code`),
+    UNIQUE KEY `role_role_name_uindex` (`role_name`)
 );
 
 -- ----------------------------
@@ -431,7 +461,7 @@ CREATE TABLE `source_db_basic`
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`        text                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+    `temp_view`        json                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
     PRIMARY KEY (`id`)
 );
 
@@ -452,8 +482,8 @@ CREATE TABLE `source_db_detail`
     `table_fields`     longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
     `data_sql`         longtext COMMENT 'SQL statement to collect source data, required for full amount',
     `crontab`          varchar(56)           DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -506,8 +536,8 @@ CREATE TABLE `source_file_detail`
     `username`         varchar(32)  NOT NULL COMMENT 'User name of the data source IP host',
     `password`         varchar(64)  NOT NULL COMMENT 'The password corresponding to the above user name',
     `file_path`        varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -529,44 +559,45 @@ CREATE TABLE `storage_ext`
     `key_name`     varchar(64) NOT NULL COMMENT 'Configuration item name',
     `key_value`    varchar(256)         DEFAULT NULL COMMENT 'The value of the configuration item',
     `is_deleted`   tinyint(1)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`  timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time`  timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_storage_id` (`storage_id`)
 );
 
 -- ----------------------------
 -- Table structure for storage_hive
 -- ----------------------------
 DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
 CREATE TABLE `storage_hive`
 (
-    `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`     varchar(128) NOT NULL COMMENT 'Owning business group id',
-    `inlong_stream_id`    varchar(128) NOT NULL COMMENT 'Owning data stream id',
-    `jdbc_url`            varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
-    `username`            varchar(128) NOT NULL COMMENT 'Username',
-    `password`            varchar(255) NOT NULL COMMENT 'User password',
-    `db_name`             varchar(128) NOT NULL COMMENT 'Target database name',
-    `table_name`          varchar(128) NOT NULL COMMENT 'Target data table name',
-    `primary_partition`   varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
-    `secondary_partition` varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
-    `partition_type`      varchar(10)           DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
-    `file_format`         varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
-    `encoding_type`       varchar(255)          DEFAULT NULL COMMENT 'data encoding',
-    `field_splitter`      varchar(10)           DEFAULT NULL COMMENT 'field separator',
-    `hdfs_default_fs`     varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
-    `warehouse_dir`       varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
-    `usage_interval`      varchar(10)           DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
-    `storage_period`      int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
-    `previous_status`     int(11)               DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`           text                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
-    `opt_log`             varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `id`                          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`             varchar(128) NOT NULL COMMENT 'Owning business group id',
+    `inlong_stream_id`            varchar(128) NOT NULL COMMENT 'Owning data stream id',
+    `jdbc_url`                    varchar(255)          DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+    `username`                    varchar(128)          DEFAULT NULL COMMENT 'Username',
+    `password`                    varchar(255)          DEFAULT NULL COMMENT 'User password',
+    `db_name`                     varchar(128)          DEFAULT NULL COMMENT 'Target database name',
+    `table_name`                  varchar(128)          DEFAULT NULL COMMENT 'Target data table name',
+    `hdfs_default_fs`             varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+    `warehouse_dir`               varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+    `partition_interval`          int(5)                DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+    `partition_unit`              varchar(10)           DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+    `primary_partition`           varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
+    `secondary_partition`         varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
+    `partition_creation_strategy` varchar(50)           DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+    `file_format`                 varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+    `data_encoding`               varchar(20)           DEFAULT 'UTF-8' COMMENT 'data encoding type',
+    `data_separator`              varchar(10)           DEFAULT NULL COMMENT 'data field separator',
+    `storage_period`              int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
+    `opt_log`                     varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `status`                      int(4)                DEFAULT '0' COMMENT 'status',
+    `previous_status`             int(4)                DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`                  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`                     varchar(64)           DEFAULT NULL COMMENT 'creator name',
+    `modifier`                    varchar(64)           DEFAULT NULL COMMENT 'modifier name',
+    `create_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `modify_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+    `temp_view`                   text                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
     PRIMARY KEY (`id`)
 );
 
@@ -590,7 +621,8 @@ CREATE TABLE `storage_hive_field`
     `is_exist`          tinyint(1)    DEFAULT '0' COMMENT 'Does it exist, 0: does not exist, 1: exists',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        tinyint(1)    DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_storage_id` (`storage_id`)
 );
 
 -- ----------------------------
@@ -680,7 +712,8 @@ CREATE TABLE `user`
     `update_time`  datetime              DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
     `create_by`    varchar(255) NOT NULL COMMENT 'create by sb.',
     `update_by`    varchar(255)          DEFAULT NULL COMMENT 'update by sb.',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `user_name_uindex` (`name`)
 );
 
 -- create default admin user, username is 'admin', password is 'inlong'
@@ -723,7 +756,8 @@ CREATE TABLE `wf_approver`
     `create_time`       timestamp     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`       timestamp     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'update time',
     `is_deleted`        int(11)                DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `process_name_task_name_index` (`process_name`, `task_name`)
 );
 
 -- create default approver for new consumption and new business
@@ -778,7 +812,7 @@ CREATE TABLE `wf_process_instance`
     `form_data`       mediumtext COMMENT 'form information',
     `start_time`      datetime     NOT NULL COMMENT 'start time',
     `end_time`        datetime              DEFAULT NULL COMMENT 'End event',
-    `ext`             text COMMENT 'Extended information-text',
+    `ext`             text COMMENT 'Extended information-json',
     `hidden`          tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it hidden',
     PRIMARY KEY (`id`)
 );
@@ -804,7 +838,7 @@ CREATE TABLE `wf_task_instance`
     `form_data`            mediumtext COMMENT 'form information submitted by the current task',
     `start_time`           datetime      NOT NULL COMMENT 'start time',
     `end_time`             datetime      DEFAULT NULL COMMENT 'End time',
-    `ext`                  text COMMENT 'Extended information-text',
+    `ext`                  text COMMENT 'Extended information-json',
     PRIMARY KEY (`id`)
 );
 
@@ -821,13 +855,14 @@ CREATE TABLE `cluster_set`
     `middleware_type` varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
     `in_charges`      varchar(512) COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512) COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'ClusterSet status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'ClusterSet status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)  NULL COMMENT 'Modifier name',
     `create_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cluster_set` (`set_name`)
 );
 
 -- ----------------------------
@@ -839,7 +874,8 @@ CREATE TABLE `cluster_set_inlongid`
     `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `set_name`        varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`, `inlong_group_id`)
 );
 
 -- ----------------------------
@@ -852,7 +888,8 @@ CREATE TABLE `cache_cluster`
     `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
     `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `zone`         varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cache_cluster` (`cluster_name`)
 );
 
 -- ----------------------------
@@ -866,8 +903,9 @@ CREATE TABLE `cache_cluster_ext`
     `key_name`     varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`    varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`   tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_cache_cluster` (`cluster_name`)
 );
 
 -- ----------------------------
@@ -880,7 +918,8 @@ CREATE TABLE `cache_topic`
     `topic_name`    varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore',
     `set_name`      varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `partition_num` int(11)      NOT NULL COMMENT 'Partition number',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cache_topic` (`topic_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -893,7 +932,8 @@ CREATE TABLE `proxy_cluster`
     `cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
     `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `zone`         varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_proxy_cluster` (`cluster_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -905,7 +945,8 @@ CREATE TABLE `proxy_cluster_to_cache_cluster`
     `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
     `cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`, `cache_cluster_name`)
 );
 
 -- ----------------------------
@@ -920,7 +961,8 @@ CREATE TABLE `flume_source`
     `type`          varchar(128) NOT NULL COMMENT 'FlumeSource classname',
     `channels`      varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space',
     `selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_flume_source` (`source_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -935,8 +977,9 @@ CREATE TABLE `flume_source_ext`
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_flume_source_ext` (`parent_name`)
 );
 
 -- ----------------------------
@@ -949,7 +992,8 @@ CREATE TABLE `flume_channel`
     `channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
     `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `type`         varchar(128) NOT NULL COMMENT 'FlumeChannel classname',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_flume_channel` (`channel_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -964,8 +1008,9 @@ CREATE TABLE `flume_channel_ext`
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_flume_channel_ext` (`parent_name`)
 );
 
 -- ----------------------------
@@ -979,7 +1024,8 @@ CREATE TABLE `flume_sink`
     `set_name`  varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `type`      varchar(128) NOT NULL COMMENT 'FlumeSink classname',
     `channel`   varchar(128) NOT NULL COMMENT 'FlumeSink channel',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_flume_sink` (`sink_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -994,8 +1040,9 @@ CREATE TABLE `flume_sink_ext`
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_flume_sink_ext` (`parent_name`)
 );
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index 75b463b..1221742 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -21,7 +21,6 @@ import com.github.pagehelper.PageInfo;
 import java.util.List;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
@@ -72,23 +71,6 @@ public interface DataStreamService {
     PageInfo<DataStreamListVO> listByCondition(DataStreamPageRequest request);
 
     /**
-     * Query all hive config for business group id
-     *
-     * @param groupId Business group id
-     * @return Hive config list
-     */
-    List<DataStreamInfoToHiveConfig> queryHiveConfigForAllDataStream(String groupId);
-
-    /**
-     * Query hive config for one data stream
-     *
-     * @param groupId Business group id
-     * @param streamId Data stream id
-     * @return Hive config
-     */
-    DataStreamInfoToHiveConfig queryHiveConfigForOneDataStream(String groupId, String streamId);
-
-    /**
      * Business information that needs to be modified
      *
      * @param dataStreamInfo data stream information that needs to be modified
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 152125a..ba64f16 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -416,47 +416,11 @@ public class BusinessServiceImpl implements BusinessService {
         // If you need to change business info after approve, just do in here
         this.updateStatus(groupId, EntityStatus.BIZ_CONFIG_ING.getCode(), operator);
 
-        if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
-            BusinessPulsarEntity pulsarEntity = checkAndGetEntity(approveInfo);
-            businessPulsarMapper.updateByIdentifierSelective(pulsarEntity);
-        }
-
         LOGGER.info("success to update business status after approve for groupId={}", groupId);
         return true;
     }
 
     /**
-     * Check whether the Pulsar parameters filled in during approval are valid,
-     * if valid, return to the encapsulated entity
-     */
-    private BusinessPulsarEntity checkAndGetEntity(BusinessApproveInfo approveInfo) {
-        // Pulsar params must meet: ackQuorum <= writeQuorum <= ensemble
-        Integer ackQuorum = approveInfo.getAckQuorum();
-        Integer writeQuorum = approveInfo.getWriteQuorum();
-        Integer ensemble = approveInfo.getEnsemble();
-        Preconditions.checkNotNull(ackQuorum, "Pulsar ackQuorum cannot be empty");
-        Preconditions.checkNotNull(writeQuorum, "Pulsar writeQuorum cannot be empty");
-        Preconditions.checkNotNull(ensemble, "Pulsar ensemble cannot be empty");
-        if (!(ackQuorum <= writeQuorum && writeQuorum <= ensemble)) {
-            throw new BusinessException(BizErrorCodeEnum.BUSINESS_SAVE_FAILED,
-                    "Pulsar params must meet: ackQuorum <= writeQuorum <= ensemble");
-        }
-
-        Preconditions.checkTrue(approveInfo.getTopicPartitionNum() != null
-                        && approveInfo.getTopicPartitionNum() >= 1 && approveInfo.getTopicPartitionNum() <= 20,
-                "topic partition num must meet >= 1 and <= 20");
-
-        Preconditions.checkTrue(approveInfo.getTtl() != null && approveInfo.getTtlUnit() != null,
-                "retention size and unit cannot be empty");
-        Preconditions.checkTrue(approveInfo.getRetentionSize() != null && approveInfo.getRetentionSizeUnit() != null,
-                "retention size and unit cannot be empty");
-        Preconditions.checkTrue(approveInfo.getRetentionTime() != null && approveInfo.getRetentionTimeUnit() != null,
-                "retention size and unit cannot be empty");
-
-        return CommonBeanUtils.copyProperties(approveInfo, BusinessPulsarEntity::new);
-    }
-
-    /**
      * Update extended information
      * <p/>First physically delete the existing extended information, and then add this batch of extended information
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index 32d91af..8e30a74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -42,7 +42,6 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamFieldInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
@@ -94,11 +93,11 @@ public class DataStreamServiceImpl implements DataStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public Integer save(DataStreamInfo dataStreamInfo, String operator) {
-        LOGGER.debug("begin to save data stream info={}", dataStreamInfo);
-        Preconditions.checkNotNull(dataStreamInfo, "data stream info is empty");
-        String groupId = dataStreamInfo.getInlongGroupId();
-        String streamId = dataStreamInfo.getInlongStreamId();
+    public Integer save(DataStreamInfo streamInfo, String operator) {
+        LOGGER.debug("begin to save data stream info={}", streamInfo);
+        Preconditions.checkNotNull(streamInfo, "data stream info is empty");
+        String groupId = streamInfo.getInlongGroupId();
+        String streamId = streamInfo.getInlongStreamId();
         Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, BizConstant.STREAM_ID_IS_EMPTY);
 
@@ -112,8 +111,9 @@ public class DataStreamServiceImpl implements DataStreamService {
             throw new BusinessException(BizErrorCodeEnum.DATA_STREAM_ID_DUPLICATE);
         }
 
+        streamInfo.setMqResourceObj(streamId);
         // Processing dataStream
-        DataStreamEntity streamEntity = CommonBeanUtils.copyProperties(dataStreamInfo, DataStreamEntity::new);
+        DataStreamEntity streamEntity = CommonBeanUtils.copyProperties(streamInfo, DataStreamEntity::new);
         Date date = new Date();
         streamEntity.setStatus(EntityStatus.DATA_STREAM_NEW.getCode());
         streamEntity.setModifier(operator);
@@ -122,9 +122,9 @@ public class DataStreamServiceImpl implements DataStreamService {
         streamMapper.insertSelective(streamEntity);
 
         // Processing extended information
-        this.saveExt(groupId, streamId, dataStreamInfo.getExtList(), date);
+        this.saveExt(groupId, streamId, streamInfo.getExtList(), date);
         // Process data source fields
-        this.saveField(groupId, streamId, dataStreamInfo.getFieldList());
+        this.saveField(groupId, streamId, streamInfo.getFieldList());
 
         LOGGER.info("success to save data stream info for groupId={}", groupId);
         return streamEntity.getId();
@@ -175,16 +175,6 @@ public class DataStreamServiceImpl implements DataStreamService {
     }
 
     @Override
-    public List<DataStreamInfoToHiveConfig> queryHiveConfigForAllDataStream(String groupId) {
-        return streamMapper.selectStreamToHiveInfo(groupId);
-    }
-
-    @Override
-    public DataStreamInfoToHiveConfig queryHiveConfigForOneDataStream(String groupId, String streamId) {
-        return streamMapper.selectStreamToHiveInfoByIdentifier(groupId, streamId);
-    }
-
-    @Override
     public PageInfo<DataStreamListVO> listByCondition(DataStreamPageRequest request) {
         LOGGER.debug("begin to list data stream page by {}", request);
 
@@ -484,11 +474,12 @@ public class DataStreamServiceImpl implements DataStreamService {
         // The person in charge of the business has the authority of all data streams
         BusinessEntity businessEntity = businessMapper.selectByIdentifier(groupId);
         Preconditions.checkNotNull(businessEntity, "business not found by groupId=" + groupId);
+
         String inCharges = businessEntity.getInCharges();
+        request.setInCharges(inCharges);
 
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
-        Page<DataStreamEntity> page = (Page<DataStreamEntity>) streamMapper.selectByConditionAndInCharges(request,
-                inCharges);
+        Page<DataStreamEntity> page = (Page<DataStreamEntity>) streamMapper.selectByCondition(request);
         List<DataStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, DataStreamInfo::new);
 
         // Convert and encapsulate the paged results
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
index 0d5a268..1a8fb24 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
@@ -87,11 +87,11 @@ public class StorageHiveOperation extends StorageBaseOperation {
 
         // Set the encoding type and field splitter
         DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, entity.getInlongStreamId());
-        String encodingType = streamEntity.getDataEncoding() == null
+        String dataEncoding = streamEntity.getDataEncoding() == null
                 ? StandardCharsets.UTF_8.displayName() : streamEntity.getDataEncoding();
-        entity.setEncodingType(encodingType);
-        if (entity.getFieldSplitter() == null) {
-            entity.setFieldSplitter(streamEntity.getFileDelimiter());
+        entity.setDataEncoding(dataEncoding);
+        if (entity.getDataSeparator() == null) {
+            entity.setDataSeparator(streamEntity.getDataSeparator());
         }
 
         entity.setStatus(EntityStatus.DATA_STORAGE_NEW.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
index 82868cf..3464294 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
@@ -146,7 +146,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
 
         // Query HDFS, HIVE, ES storage information and encapsulate it in the result set
         List<StorageSummaryInfo> totalList = new ArrayList<>();
-        List<StorageSummaryInfo> hiveSummaryList = hiveStorageMapper.selectSummaryByIdentifier(groupId, streamId);
+        List<StorageSummaryInfo> hiveSummaryList = hiveStorageMapper.selectSummary(groupId, streamId);
 
         totalList.addAll(hiveSummaryList);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
index f9d5272..dbaa75e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
@@ -125,8 +125,8 @@ public class WorkflowApproverServiceImpl implements WorkflowApproverService {
 
     @Override
     public void update(WorkflowApprover config, String operator) {
-        Preconditions.checkNotNull(config, "config can't be null");
-        Preconditions.checkNotNull(config.getId(), "id can't be null");
+        Preconditions.checkNotNull(config, "config cannot be null");
+        Preconditions.checkNotNull(config.getId(), "id cannot be null");
 
         WorkflowApproverEntity entity = workflowApproverMapper.selectByPrimaryKey(config.getId());
         Preconditions.checkNotNull(entity, "not exist with id:" + config.getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
index 8b6a753..8b9d6eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.thirdpart.hive;
 
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
-import org.apache.inlong.manager.service.core.DataStreamService;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -37,8 +37,7 @@ import org.springframework.stereotype.Service;
 public class CreateHiveTableForAllStreamListener implements TaskEventListener {
 
     @Autowired
-    private DataStreamService dataStreamService;
-
+    private StorageHiveEntityMapper hiveEntityMapper;
     @Autowired
     private HiveTableOperator hiveTableOperator;
 
@@ -53,12 +52,12 @@ public class CreateHiveTableForAllStreamListener implements TaskEventListener {
         String groupId = form.getInlongGroupId();
         log.info("begin to create hive table for groupId={}", groupId);
 
-        List<DataStreamInfoToHiveConfig> configList = dataStreamService.queryHiveConfigForAllDataStream(groupId);
+        List<StorageHiveSortInfo> configList = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, null);
         if (configList == null || configList.size() == 0) {
             return ListenerResult.success();
         }
 
-        for (DataStreamInfoToHiveConfig hiveConfig : configList) {
+        for (StorageHiveSortInfo hiveConfig : configList) {
             hiveTableOperator.createHiveTable(groupId, hiveConfig);
             log.info("finish to create hive table for business {}", groupId);
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
index ec3dcd1..58ee3ac 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
@@ -17,9 +17,10 @@
 
 package org.apache.inlong.manager.service.thirdpart.hive;
 
+import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
-import org.apache.inlong.manager.service.core.DataStreamService;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -36,8 +37,7 @@ import org.springframework.stereotype.Service;
 public class CreateHiveTableForOneStreamListener implements TaskEventListener {
 
     @Autowired
-    private DataStreamService dataStreamService;
-
+    private StorageHiveEntityMapper hiveEntityMapper;
     @Autowired
     private HiveTableOperator hiveTableOperator;
 
@@ -53,11 +53,13 @@ public class CreateHiveTableForOneStreamListener implements TaskEventListener {
         String streamId = form.getInlongStreamId();
         log.info("begin create hive table for groupId={}, streamId={}", groupId, streamId);
 
-        DataStreamInfoToHiveConfig hiveConfig = dataStreamService.queryHiveConfigForOneDataStream(groupId, streamId);
-        if (hiveConfig == null) {
+        List<StorageHiveSortInfo> hiveConfig = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
+        if (hiveConfig == null || hiveConfig.size() == 0) {
             return ListenerResult.success();
         }
-        hiveTableOperator.createHiveTable(groupId, hiveConfig);
+        for (StorageHiveSortInfo info : hiveConfig) {
+            hiveTableOperator.createHiveTable(groupId, info);
+        }
 
         log.info("finish create hive table for business {} - {} ", groupId, streamId);
         return ListenerResult.success();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
index eb0d066..aba950f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.EntityStatus;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
 import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
 import org.apache.inlong.manager.common.pojo.query.DatabaseQueryBean;
 import org.apache.inlong.manager.common.pojo.query.hive.HiveColumnQueryBean;
@@ -54,7 +54,7 @@ public class HiveTableOperator {
     /**
      * Create hive table according to the groupId and hive config
      */
-    public void createHiveTable(String groupId, DataStreamInfoToHiveConfig hiveConfig) {
+    public void createHiveTable(String groupId, StorageHiveSortInfo hiveConfig) {
         if (log.isDebugEnabled()) {
             log.debug("begin create hive table for business={}, hiveConfig={}", groupId, hiveConfig);
         }
@@ -90,7 +90,7 @@ public class HiveTableOperator {
         log.info("finish create hive table for business {} ", groupId);
     }
 
-    protected HiveTableQueryBean getTableQueryBean(DataStreamInfoToHiveConfig hiveConfig) {
+    protected HiveTableQueryBean getTableQueryBean(StorageHiveSortInfo hiveConfig) {
         String groupId = hiveConfig.getInlongGroupId();
         String streamId = hiveConfig.getInlongStreamId();
         log.info("begin to get table query bean for groupId={}, streamId={}", groupId, streamId);
@@ -119,11 +119,11 @@ public class HiveTableOperator {
         HiveTableQueryBean queryBean = new HiveTableQueryBean();
         queryBean.setColumns(columnQueryBeans);
         // set terminated symbol
-        if (hiveConfig.getFieldSplitter() != null) {
-            char ch = (char) Integer.parseInt(hiveConfig.getFieldSplitter());
+        if (hiveConfig.getTargetSeparator() != null) {
+            char ch = (char) Integer.parseInt(hiveConfig.getTargetSeparator());
             queryBean.setFieldTerSymbol(String.valueOf(ch));
         }
-        queryBean.setUsername(hiveConfig.getUserName());
+        queryBean.setUsername(hiveConfig.getUsername());
         queryBean.setPassword(hiveConfig.getPassword());
         queryBean.setTableName(hiveConfig.getTableName());
         queryBean.setDbName(hiveConfig.getDbName());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index d27ef01..9f037ec 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -17,20 +17,26 @@
 
 package org.apache.inlong.manager.service.thirdpart.sort;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
 import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
+import org.apache.inlong.manager.dao.entity.StorageHiveFieldEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
 import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.core.StorageService;
 import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
 import org.apache.inlong.manager.workflow.core.event.ListenerResult;
 import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -42,8 +48,13 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
 import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
 import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.SinkInfo;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
 import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -53,14 +64,30 @@ import org.springframework.stereotype.Component;
 @Component
 public class PushHiveConfigTaskListener implements TaskEventListener {
 
+    private static final Map<String, String> PARTITION_TIME_FORMAT_MAP = new HashMap<>();
+
+    private static final Map<String, TimeUnit> PARTITION_TIME_UNIT_MAP = new HashMap<>();
+
+    static {
+        PARTITION_TIME_FORMAT_MAP.put("D", "yyyyMMdd");
+        PARTITION_TIME_FORMAT_MAP.put("H", "yyyyMMddHH");
+        PARTITION_TIME_FORMAT_MAP.put("I", "yyyyMMddHHmm");
+
+        PARTITION_TIME_UNIT_MAP.put("D", TimeUnit.DAYS);
+        PARTITION_TIME_UNIT_MAP.put("H", TimeUnit.HOURS);
+        PARTITION_TIME_UNIT_MAP.put("I", TimeUnit.MINUTES);
+    }
+
+    @Autowired
+    private ClusterBean clusterBean;
     @Autowired
-    private StorageService storageService;
+    private BusinessEntityMapper businessMapper;
     @Autowired
     private StorageHiveEntityMapper storageHiveMapper;
     @Autowired
-    private DataStreamService dataStreamService;
+    private StorageHiveFieldEntityMapper hiveFieldMapper;
     @Autowired
-    private ClusterBean clusterBean;
+    private DataStreamService dataStreamService;
 
     @Override
     public TaskEvent event() {
@@ -76,19 +103,24 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
         CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
         BusinessInfo businessInfo = form.getBusinessInfo();
         String groupId = businessInfo.getInlongGroupId();
+
+        BusinessEntity business = businessMapper.selectByIdentifier(groupId);
+        if (business == null || EntityStatus.IS_DELETED.getCode().equals(business.getIsDeleted())) {
+            log.warn("skip to push sort hive config for groupId={}, as biz not exists or has been deleted", groupId);
+            return ListenerResult.success();
+        }
+
         // if streamId not null, just push the config belongs to the groupId and the streamId
         String streamId = form.getInlongStreamId();
+        List<StorageHiveSortInfo> hiveInfoList = storageHiveMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
+        for (StorageHiveSortInfo hiveInfo : hiveInfoList) {
+            Integer storageId = hiveInfo.getId();
 
-        List<StorageHiveEntity> storageHiveEntities = storageHiveMapper.selectByIdentifier(groupId, streamId);
-        for (StorageHiveEntity hiveEntity : storageHiveEntities) {
-            Integer storageId = hiveEntity.getId();
-            StorageHiveInfo hiveStorage = (StorageHiveInfo) storageService
-                    .getById(BizConstant.STORAGE_HIVE, storageId);
             if (log.isDebugEnabled()) {
-                log.debug("hive storage info: {}", hiveStorage);
+                log.debug("hive storage info: {}", hiveInfo);
             }
 
-            DataFlowInfo dataFlowInfo = getDataFlowInfo(businessInfo, hiveStorage);
+            DataFlowInfo dataFlowInfo = getDataFlowInfo(business, hiveInfo);
             if (log.isDebugEnabled()) {
                 log.debug("try to push hive config to sort: {}", JsonUtils.toJson(dataFlowInfo));
             }
@@ -109,26 +141,62 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
         return ListenerResult.success();
     }
 
-    private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveInfo hiveStorage) {
-        Stream<FieldInfo> hiveFields = hiveStorage.getHiveFieldList().stream().map(field -> {
-            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
-            return new FieldInfo(field.getFieldName(), formatInfo);
-        });
+    private DataFlowInfo getDataFlowInfo(BusinessEntity businessEntity, StorageHiveSortInfo hiveInfo) {
+        String groupId = hiveInfo.getInlongGroupId();
+        String streamId = hiveInfo.getInlongStreamId();
+        List<StorageHiveFieldEntity> fieldList = hiveFieldMapper.selectHiveFields(groupId, streamId);
+
+        if (fieldList == null || fieldList.size() == 0) {
+            throw new WorkflowListenerException("no hive fields for groupId=" + groupId + ", streamId=" + streamId);
+        }
+
+        SourceInfo sourceInfo = getSourceInfo(businessEntity, hiveInfo, fieldList);
+        SinkInfo sinkInfo = getSinkInfo(hiveInfo, fieldList);
+
+        // push information
+        return new DataFlowInfo(hiveInfo.getId(), sourceInfo, sinkInfo);
+    }
+
+    private HiveSinkInfo getSinkInfo(StorageHiveSortInfo hiveInfo, List<StorageHiveFieldEntity> fieldList) {
+        if (hiveInfo.getJdbcUrl() == null) {
+            throw new WorkflowListenerException("hive server url cannot be empty");
+        }
+
+        // Use the field separator in Hive, the default is TextFile
+        Character separator = (char) Integer.parseInt(hiveInfo.getTargetSeparator());
+        HiveFileFormat fileFormat;
+        String format = hiveInfo.getFileFormat();
 
-        List<FieldInfo> sinkFields = hiveFields.collect(Collectors.toList());
-        FieldInfo partitionFieldInfo = new FieldInfo(hiveStorage.getPrimaryPartition(),
-                new TimestampFormatInfo("MILLIS"));
-        sinkFields.add(partitionFieldInfo);
+        if (BizConstant.FILE_FORMAT_ORC.equalsIgnoreCase(format)) {
+            fileFormat = new HiveSinkInfo.OrcFileFormat(1000);
+        } else if (BizConstant.FILE_FORMAT_SEQUENCE.equalsIgnoreCase(format)) {
+            fileFormat = new HiveSinkInfo.SequenceFileFormat(separator, 100);
+        } else if (BizConstant.FILE_FORMAT_PARQUET.equalsIgnoreCase(format)) {
+            fileFormat = new HiveSinkInfo.ParquetFileFormat();
+        } else {
+            fileFormat = new HiveSinkInfo.TextFileFormat(separator);
+        }
 
-        String hiveServerUrl = hiveStorage.getJdbcUrl();
-        if (hiveServerUrl != null && !hiveServerUrl.startsWith("jdbc:hive2://")) {
-            hiveServerUrl = "jdbc:hive2://" + hiveServerUrl;
+        // The primary partition field, in Sink must be HiveTimePartitionInfo
+        List<HiveSinkInfo.HivePartitionInfo> partitionList = new ArrayList<>();
+        String primary = hiveInfo.getPrimaryPartition();
+        if (StringUtils.isNotEmpty(primary)) {
+            // Hive partitions are by day, hour, and minute
+            String unit = hiveInfo.getPartitionUnit();
+            HiveTimePartitionInfo timePartitionInfo = new HiveTimePartitionInfo(
+                    primary, PARTITION_TIME_FORMAT_MAP.get(unit));
+            partitionList.add(timePartitionInfo);
+        }
+        // For the secondary partition field, the sink is temporarily encapsulated as HiveFieldPartitionInfo,
+        // TODO the type be set according to the type of the field itself.
+        if (StringUtils.isNotEmpty(hiveInfo.getSecondaryPartition())) {
+            partitionList.add(new HiveSinkInfo.HiveFieldPartitionInfo(hiveInfo.getSecondaryPartition()));
         }
 
         // dataPath = hdfsUrl + / + warehouseDir + / + dbName + .db/ + tableName
         StringBuilder dataPathBuilder = new StringBuilder();
-        String hdfsUrl = hiveStorage.getHdfsDefaultFs();
-        String warehouseDir = hiveStorage.getWarehouseDir();
+        String hdfsUrl = hiveInfo.getHdfsDefaultFs();
+        String warehouseDir = hiveInfo.getWarehouseDir();
         if (hdfsUrl.endsWith("/")) {
             dataPathBuilder.append(hdfsUrl, 0, hdfsUrl.length() - 1);
         } else {
@@ -139,57 +207,116 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
         } else {
             dataPathBuilder.append(warehouseDir);
         }
-        String dataPath = dataPathBuilder.append("/")
-                .append(hiveStorage.getDbName())
-                .append(".db/")
-                .append(hiveStorage.getTableName())
-                .toString();
-
-        // Encapsulate the deserialization information in the source
-        HiveSinkInfo.HiveFileFormat fileFormat = new HiveSinkInfo.TextFileFormat(',');
-        if (hiveStorage.getFieldSplitter() != null) {
-            char c = (char) Integer.parseInt(hiveStorage.getFieldSplitter());
-            fileFormat = new HiveSinkInfo.TextFileFormat(c);
+        String dataPath = dataPathBuilder.append("/").append(hiveInfo.getDbName())
+                .append(".db/").append(hiveInfo.getTableName()).toString();
+
+        // Get the sink field, if there is no partition field in the source field, add the partition field to the end
+        List<FieldInfo> fieldInfoList = getSinkFields(fieldList, hiveInfo.getPrimaryPartition());
+
+        return new HiveSinkInfo(fieldInfoList.toArray(new FieldInfo[0]), hiveInfo.getJdbcUrl(),
+                hiveInfo.getDbName(), hiveInfo.getTableName(), hiveInfo.getUsername(), hiveInfo.getPassword(),
+                dataPath, partitionList.toArray(new HiveSinkInfo.HivePartitionInfo[0]), fileFormat);
+    }
+
+    /**
+     * Get source info
+     */
+    private SourceInfo getSourceInfo(BusinessEntity businessEntity, StorageHiveSortInfo info,
+            List<StorageHiveFieldEntity> fieldList) {
+        DeserializationInfo deserializationInfo = null;
+        boolean isDbType = BizConstant.DATA_SOURCE_DB.equals(info.getDataSourceType());
+        if (!isDbType) {
+            // FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use TDMsgCsv
+            String dataType = info.getDataType();
+            if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataType)
+                    || BizConstant.DATA_TYPE_KEY_VALUE.equalsIgnoreCase(dataType)) {
+                // Use the field separator from the data stream
+                char separator = (char) Integer.parseInt(info.getSourceSeparator());
+                // TODO support escape
+                /*Character escape = null;
+                if (info.getDataEscapeChar() != null) {
+                    escape = info.getDataEscapeChar().charAt(0);
+                }*/
+                // Whether to delete the first separator, the default is false for the time being
+                deserializationInfo = new TDMsgCsvDeserializationInfo(info.getInlongStreamId(), separator);
+            }
+        }
+
+        // The number and order of the source fields must be the same as the target fields
+        SourceInfo sourceInfo = null;
+        // Get the source field, if there is no partition field in source, add the partition field to the end
+        List<FieldInfo> sourceFields = getSourceFields(fieldList, info.getPrimaryPartition());
+
+        String middleWare = businessEntity.getMiddlewareType();
+        if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middleWare)) {
+            String masterAddress = clusterBean.getTubeMaster();
+            Preconditions.checkNotNull(masterAddress, "tube cluster address cannot be empty");
+            String topic = businessEntity.getMqResourceObj();
+            // The consumer group name is: taskName_topicName_consumer_group
+            String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+            sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
+                    deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
+        } else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middleWare)) {
+            String tenant = clusterBean.getDefaultTenant();
+            String namespace = businessEntity.getMqResourceObj();
+            String pulsarTopic = info.getMqResourceObj();
+            // Full name of Topic in Pulsar
+            String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
+            String adminUrl = clusterBean.getPulsarServiceUrl();
+            String serviceUrl = clusterBean.getPulsarServiceUrl();
+            String consumerGroup = clusterBean.getAppName() + "_" + pulsarTopic + "_consumer_group";
+            sourceInfo = new PulsarSourceInfo(adminUrl, serviceUrl, fullTopicName, consumerGroup,
+                    deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
         }
 
-        // encapsulate hive sink
-        HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
-                sinkFields.toArray(new FieldInfo[0]),
-                hiveServerUrl,
-                hiveStorage.getDbName(),
-                hiveStorage.getTableName(),
-                hiveStorage.getUsername(),
-                hiveStorage.getPassword(),
-                dataPath,
-                Stream.of(new HiveSinkInfo.HiveTimePartitionInfo(hiveStorage.getPrimaryPartition(),
-                        "yyyyMMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
-                fileFormat
-        );
-
-        // data stream fields
-        DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getInlongGroupId(),
-                hiveStorage.getInlongStreamId());
-        Stream<FieldInfo> streamFields = dataStream.getFieldList().stream().map(field -> {
+        return sourceInfo;
+    }
+
+    /**
+     * Get sink fields
+     */
+    private List<FieldInfo> getSinkFields(List<StorageHiveFieldEntity> fieldList, String partitionField) {
+        boolean duplicate = false;
+        List<FieldInfo> fieldInfoList = new ArrayList<>();
+        for (StorageHiveFieldEntity field : fieldList) {
+            String fieldName = field.getFieldName();
+            if (fieldName.equals(partitionField)) {
+                duplicate = true;
+            }
+
             FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
-            return new FieldInfo(field.getFieldName(), formatInfo);
-        });
-
-        String topic = businessInfo.getMqResourceObj();
-        String consumeGroupName = "sort_" + businessInfo.getMqResourceObj() + "_consumer_group";
-        TDMsgCsvDeserializationInfo deserializationInfo = null;
-        if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
-            char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
-            deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getInlongStreamId(), c);
+            FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
+            fieldInfoList.add(fieldInfo);
         }
-        SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumeGroupName,
-                deserializationInfo, streamFields.toArray(FieldInfo[]::new));
 
-        // push information
-        return new DataFlowInfo(hiveStorage.getId(), sourceInfo, hiveSinkInfo);
+        // There is no partition field in the ordinary field, you need to add the partition field to the end
+        if (!duplicate && StringUtils.isNotEmpty(partitionField)) {
+            FieldInfo fieldInfo = new FieldInfo(partitionField, new TimestampFormatInfo("MILLIS"));
+            fieldInfoList.add(0, fieldInfo);
+        }
+        return fieldInfoList;
+    }
+
+    /**
+     * Get source field list
+     * TODO  support BuiltInField
+     */
+    private List<FieldInfo> getSourceFields(List<StorageHiveFieldEntity> fieldList, String partitionField) {
+        List<FieldInfo> fieldInfoList = new ArrayList<>();
+        for (StorageHiveFieldEntity field : fieldList) {
+            FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getSourceFieldType().toLowerCase());
+            String fieldName = field.getSourceFieldName();
+
+            FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
+            fieldInfoList.add(fieldInfo);
+        }
+
+        return fieldInfoList;
     }
 
     @Override
     public boolean async() {
         return false;
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java
index 1df0d1c..ff62e36 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java
@@ -17,8 +17,11 @@
 
 package org.apache.inlong.manager.service.thirdpart.sort;
 
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
 import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
 import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteTypeInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
 import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
 import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
 import org.apache.inlong.sort.formats.common.FloatFormatInfo;
@@ -27,6 +30,8 @@ import org.apache.inlong.sort.formats.common.IntFormatInfo;
 import org.apache.inlong.sort.formats.common.LongFormatInfo;
 import org.apache.inlong.sort.formats.common.ShortFormatInfo;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
 
 /**
  * Sort field formatting tool
@@ -42,21 +47,21 @@ public class SortFieldFormatUtils {
     public static FormatInfo convertFieldFormat(String type) {
         FormatInfo formatInfo;
         switch (type) {
-            case "string":
-                formatInfo = new StringFormatInfo();
-                break;
             case "boolean":
                 formatInfo = new BooleanFormatInfo();
                 break;
+            case "tinyint":
             case "byte":
                 formatInfo = new ByteFormatInfo();
                 break;
+            case "smallint":
             case "short":
                 formatInfo = new ShortFormatInfo();
                 break;
             case "int":
                 formatInfo = new IntFormatInfo();
                 break;
+            case "bigint":
             case "long":
                 formatInfo = new LongFormatInfo();
                 break;
@@ -69,7 +74,20 @@ public class SortFieldFormatUtils {
             case "decimal":
                 formatInfo = new DecimalFormatInfo();
                 break;
-            default:
+            case "date":
+                formatInfo = new DateFormatInfo();
+                break;
+            case "time":
+                formatInfo = new TimeFormatInfo();
+                break;
+            case "timestamp":
+                formatInfo = new TimestampFormatInfo();
+                break;
+            case "binary":
+            case "fixed":
+                formatInfo = new ArrayFormatInfo(ByteTypeInfo::new);
+                break;
+            default: // default is string
                 formatInfo = new StringFormatInfo();
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
index 389c4e3..4ea6073 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
@@ -94,9 +94,9 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
 
     private NewConsumptionApproveForm getAdminApproveForm(WorkflowContext context) {
         TaskInstance adminTask = queryService.listTask(TaskQuery.builder()
-                .processInstId(context.getProcessInstance().getId())
-                .name(NewConsumptionWorkflowDefinition.UT_ADMINT_NAME)
-                .build())
+                        .processInstId(context.getProcessInstance().getId())
+                        .name(NewConsumptionWorkflowDefinition.UT_ADMINT_NAME)
+                        .build())
                 .stream()
                 .findFirst()
                 .orElseThrow(() -> new BusinessException(BizErrorCodeEnum.WORKFLOW_EXE_FAILED,
@@ -104,7 +104,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
 
         Process process = context.getProcess();
         NewConsumptionApproveForm form = WorkflowFormParserUtils.parseTaskForm(adminTask, process);
-        Preconditions.checkNotNull(form, "form can't be null");
+        Preconditions.checkNotNull(form, "form cannot be null");
         return form;
     }
 
diff --git a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
index d3ae3f1..08b2320 100644
--- a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
@@ -85,7 +85,7 @@ CREATE TABLE `business`
     `schema_name`     varchar(128)          DEFAULT NULL COMMENT 'Data type, associated data_schema table',
     `in_charges`      varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512)          DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'Business status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'Business status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -150,7 +150,7 @@ CREATE TABLE `cluster_info`
     `url`         varchar(256)          DEFAULT NULL COMMENT 'Cluster URL address',
     `is_backup`   tinyint(1)            DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
     `ext_props`   text                  DEFAULT NULL COMMENT 'extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -183,7 +183,7 @@ CREATE TABLE `common_db_server`
     `db_description`      varchar(256)          DEFAULT NULL COMMENT 'DB description',
     `backup_db_server_ip` varchar(64)           DEFAULT NULL COMMENT 'Backup DB HOST',
     `backup_db_port`      int(11)               DEFAULT NULL COMMENT 'Backup DB port',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
+    `status`              int(4)                DEFAULT '0' COMMENT 'status',
     `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -208,7 +208,7 @@ CREATE TABLE `common_file_server`
     `issue_type`     varchar(128)         DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
     `username`       varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
     `password`       varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
-    `status`         int(11)              DEFAULT '0' COMMENT 'status',
+    `status`         int(4)               DEFAULT '0' COMMENT 'status',
     `is_deleted`     tinyint(1)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`        varchar(64) NOT NULL COMMENT 'Creator name',
     `modifier`       varchar(64)          DEFAULT NULL COMMENT 'Modifier name',
@@ -234,12 +234,12 @@ CREATE TABLE `consumption`
     `topic`               varchar(255) NOT NULL COMMENT 'Consumption topic',
     `filter_enabled`      int(2)                DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
     `inlong_stream_id`    varchar(1024)         DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
-    `status`              int(11)      NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `status`              int(4)       NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`             varchar(64)  NOT NULL COMMENT 'creator',
     `modifier`            varchar(64)           DEFAULT NULL COMMENT 'modifier',
     `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `is_deleted`          int(2)                DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     PRIMARY KEY (`id`)
 );
 
@@ -249,8 +249,8 @@ CREATE TABLE `consumption`
 DROP TABLE IF EXISTS `consumption_pulsar`;
 CREATE TABLE `consumption_pulsar`
 (
-    `id`                  int          NOT NULL AUTO_INCREMENT,
-    `consumption_id`      int          DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+    `id`                  int(11)      NOT NULL AUTO_INCREMENT,
+    `consumption_id`      int(11)      DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
     `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
     `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
     `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group ID',
@@ -258,7 +258,7 @@ CREATE TABLE `consumption_pulsar`
     `retry_letter_topic`  varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
     `is_dlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
     `dead_letter_topic`   varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
-    `is_deleted`          int          DEFAULT '0' COMMENT 'Whether to delete',
+    `is_deleted`          tinyint(1)   DEFAULT '0' COMMENT 'Whether to delete',
     PRIMARY KEY (`id`)
 ) COMMENT ='Pulsar consumption table';
 
@@ -278,7 +278,7 @@ CREATE TABLE `data_proxy_cluster`
     `net_type`    varchar(20)           DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
     `in_charges`  varchar(512)          DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
     `ext_props`   text                  DEFAULT NULL COMMENT 'Extended properties',
-    `status`      int(11)               DEFAULT '1' COMMENT 'Cluster status',
+    `status`      int(4)                DEFAULT '1' COMMENT 'Cluster status',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`     varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`    varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -321,7 +321,8 @@ CREATE TABLE `data_source_cmd_config`
     `modify_time`         timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Last update time ',
     `create_time`         timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `result_info`         varchar(64)          DEFAULT NULL,
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_1` (`task_id`, `bSend`, `specified_data_time`)
 );
 
 -- ----------------------------
@@ -340,18 +341,24 @@ CREATE TABLE `data_stream`
     `storage_period`         int(11)           DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
     `data_type`              varchar(20)       DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
     `data_encoding`          varchar(8)        DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
-    `file_delimiter`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_separator`         varchar(8)        DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+    `data_escape_char`       varchar(8)        DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
     `have_predefined_fields` tinyint(1)        DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+    `daily_records`          int(11)           DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+    `daily_storage`          int(11)           DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+    `peak_records`           int(11)           DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+    `max_length`             int(11)           DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
     `in_charges`             varchar(512)      DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
-    `status`                 int(11)           DEFAULT '0' COMMENT 'Data stream status',
-    `previous_status`        int(11)           DEFAULT '0' COMMENT 'Previous status',
+    `status`                 int(4)            DEFAULT '0' COMMENT 'Data stream status',
+    `previous_status`        int(4)            DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`             tinyint(1)        DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`                varchar(64)       DEFAULT NULL COMMENT 'Creator name',
     `modifier`               varchar(64)       DEFAULT NULL COMMENT 'Modifier name',
     `create_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`            timestamp    NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
     `temp_view`              text              DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_data_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`, `modify_time`)
 );
 
 -- ----------------------------
@@ -367,7 +374,8 @@ CREATE TABLE `data_stream_ext`
     `key_value`        varchar(256)          DEFAULT NULL COMMENT 'The value of the configuration item',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_stream_id` (`inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -391,7 +399,8 @@ CREATE TABLE `data_stream_field`
     `bon_field_path`      varchar(256) DEFAULT NULL COMMENT 'BON field path',
     `bon_field_type`      varchar(64)  DEFAULT NULL COMMENT 'BON field type',
     `encrypt_level`       varchar(20)  DEFAULT NULL COMMENT 'Encryption level',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_stream_id` (`inlong_stream_id`)
 );
 
 -- ----------------------------
@@ -412,7 +421,7 @@ CREATE TABLE `operation_log`
     `cost_time`           bigint(20)         DEFAULT NULL COMMENT 'time-consuming',
     `body`                text COMMENT 'Request body',
     `param`               text COMMENT 'parameter',
-    `status`              tinyint(1)         DEFAULT NULL COMMENT 'status',
+    `status`              int(4)             DEFAULT NULL COMMENT 'status',
     `request_time`        timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
     `err_msg`             text COMMENT 'Error message',
     PRIMARY KEY (`id`)
@@ -432,7 +441,9 @@ CREATE TABLE `role`
     `create_by`   varchar(255) NOT NULL,
     `update_by`   varchar(255) NOT NULL,
     `disabled`    tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `role_role_code_uindex` (`role_code`),
+    UNIQUE KEY `role_role_name_uindex` (`role_name`)
 );
 
 -- ----------------------------
@@ -450,7 +461,7 @@ CREATE TABLE `source_db_basic`
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
     `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`        text                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+    `temp_view`        json                  DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
     PRIMARY KEY (`id`)
 );
 
@@ -471,8 +482,8 @@ CREATE TABLE `source_db_detail`
     `table_fields`     longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
     `data_sql`         longtext COMMENT 'SQL statement to collect source data, required for full amount',
     `crontab`          varchar(56)           DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -525,8 +536,8 @@ CREATE TABLE `source_file_detail`
     `username`         varchar(32)  NOT NULL COMMENT 'User name of the data source IP host',
     `password`         varchar(64)  NOT NULL COMMENT 'The password corresponding to the above user name',
     `file_path`        varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
-    `status`           int(11)               DEFAULT '0' COMMENT 'Data source status',
-    `previous_status`  int(11)               DEFAULT '0' COMMENT 'Previous status',
+    `status`           int(4)                DEFAULT '0' COMMENT 'Data source status',
+    `previous_status`  int(4)                DEFAULT '0' COMMENT 'Previous status',
     `is_deleted`       tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
@@ -548,44 +559,45 @@ CREATE TABLE `storage_ext`
     `key_name`     varchar(64) NOT NULL COMMENT 'Configuration item name',
     `key_value`    varchar(256)         DEFAULT NULL COMMENT 'The value of the configuration item',
     `is_deleted`   tinyint(1)           DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`  timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time`  timestamp   NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_storage_id` (`storage_id`)
 );
 
 -- ----------------------------
 -- Table structure for storage_hive
 -- ----------------------------
 DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
 CREATE TABLE `storage_hive`
 (
-    `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `inlong_group_id`     varchar(128) NOT NULL COMMENT 'Owning business group id',
-    `inlong_stream_id`    varchar(128) NOT NULL COMMENT 'Owning data stream id',
-    `jdbc_url`            varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
-    `username`            varchar(128) NOT NULL COMMENT 'Username',
-    `password`            varchar(255) NOT NULL COMMENT 'User password',
-    `db_name`             varchar(128) NOT NULL COMMENT 'Target database name',
-    `table_name`          varchar(128) NOT NULL COMMENT 'Target data table name',
-    `primary_partition`   varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
-    `secondary_partition` varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
-    `partition_type`      varchar(10)           DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
-    `file_format`         varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
-    `encoding_type`       varchar(255)          DEFAULT NULL COMMENT 'data encoding',
-    `field_splitter`      varchar(10)           DEFAULT NULL COMMENT 'field separator',
-    `hdfs_default_fs`     varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
-    `warehouse_dir`       varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
-    `usage_interval`      varchar(10)           DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
-    `storage_period`      int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
-    `status`              int(11)               DEFAULT '0' COMMENT 'status',
-    `previous_status`     int(11)               DEFAULT '0' COMMENT 'Previous status',
-    `is_deleted`          tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `creator`             varchar(64)  NOT NULL COMMENT 'Creator name',
-    `modifier`            varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
-    `create_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`         timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `temp_view`           text                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
-    `opt_log`             varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `id`                          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `inlong_group_id`             varchar(128) NOT NULL COMMENT 'Owning business group id',
+    `inlong_stream_id`            varchar(128) NOT NULL COMMENT 'Owning data stream id',
+    `jdbc_url`                    varchar(255)          DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+    `username`                    varchar(128)          DEFAULT NULL COMMENT 'Username',
+    `password`                    varchar(255)          DEFAULT NULL COMMENT 'User password',
+    `db_name`                     varchar(128)          DEFAULT NULL COMMENT 'Target database name',
+    `table_name`                  varchar(128)          DEFAULT NULL COMMENT 'Target data table name',
+    `hdfs_default_fs`             varchar(255)          DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+    `warehouse_dir`               varchar(250)          DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+    `partition_interval`          int(5)                DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+    `partition_unit`              varchar(10)           DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+    `primary_partition`           varchar(255)          DEFAULT 'dt' COMMENT 'primary partition field',
+    `secondary_partition`         varchar(256)          DEFAULT NULL COMMENT 'secondary partition field',
+    `partition_creation_strategy` varchar(50)           DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+    `file_format`                 varchar(15)           DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+    `data_encoding`               varchar(20)           DEFAULT 'UTF-8' COMMENT 'data encoding type',
+    `data_separator`              varchar(10)           DEFAULT NULL COMMENT 'data field separator',
+    `storage_period`              int(5)                DEFAULT '10' COMMENT 'Data storage period, unit: day',
+    `opt_log`                     varchar(5000)         DEFAULT NULL COMMENT 'Background operation log',
+    `status`                      int(4)                DEFAULT '0' COMMENT 'status',
+    `previous_status`             int(4)                DEFAULT '0' COMMENT 'Previous status',
+    `is_deleted`                  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+    `creator`                     varchar(64)           DEFAULT NULL COMMENT 'creator name',
+    `modifier`                    varchar(64)           DEFAULT NULL COMMENT 'modifier name',
+    `create_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `modify_time`                 timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+    `temp_view`                   text                  DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
     PRIMARY KEY (`id`)
 );
 
@@ -609,7 +621,8 @@ CREATE TABLE `storage_hive_field`
     `is_exist`          tinyint(1)    DEFAULT '0' COMMENT 'Does it exist, 0: does not exist, 1: exists',
     `rank_num`          smallint(6)   DEFAULT '0' COMMENT 'Field order (front-end display field order)',
     `is_deleted`        tinyint(1)    DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `index_storage_id` (`storage_id`)
 );
 
 -- ----------------------------
@@ -699,7 +712,8 @@ CREATE TABLE `user`
     `update_time`  datetime              DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
     `create_by`    varchar(255) NOT NULL COMMENT 'create by sb.',
     `update_by`    varchar(255)          DEFAULT NULL COMMENT 'update by sb.',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `user_name_uindex` (`name`)
 );
 
 -- create default admin user, username is 'admin', password is 'inlong'
@@ -742,7 +756,8 @@ CREATE TABLE `wf_approver`
     `create_time`       timestamp     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`       timestamp     NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'update time',
     `is_deleted`        int(11)                DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    KEY `process_name_task_name_index` (`process_name`, `task_name`)
 );
 
 -- create default approver for new consumption and new business
@@ -797,7 +812,7 @@ CREATE TABLE `wf_process_instance`
     `form_data`       mediumtext COMMENT 'form information',
     `start_time`      datetime     NOT NULL COMMENT 'start time',
     `end_time`        datetime              DEFAULT NULL COMMENT 'End event',
-    `ext`             text COMMENT 'Extended information-text',
+    `ext`             text COMMENT 'Extended information-json',
     `hidden`          tinyint(1)   NOT NULL DEFAULT '0' COMMENT 'Is it hidden',
     PRIMARY KEY (`id`)
 );
@@ -823,7 +838,7 @@ CREATE TABLE `wf_task_instance`
     `form_data`            mediumtext COMMENT 'form information submitted by the current task',
     `start_time`           datetime      NOT NULL COMMENT 'start time',
     `end_time`             datetime      DEFAULT NULL COMMENT 'End time',
-    `ext`                  text COMMENT 'Extended information-text',
+    `ext`                  text COMMENT 'Extended information-json',
     PRIMARY KEY (`id`)
 );
 
@@ -840,13 +855,14 @@ CREATE TABLE `cluster_set`
     `middleware_type` varchar(10)           DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
     `in_charges`      varchar(512) COMMENT 'Name of responsible person, separated by commas',
     `followers`       varchar(512) COMMENT 'List of names of business followers, separated by commas',
-    `status`          int(11)               DEFAULT '21' COMMENT 'ClusterSet status',
+    `status`          int(4)                DEFAULT '21' COMMENT 'ClusterSet status',
     `is_deleted`      tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
     `creator`         varchar(64)  NOT NULL COMMENT 'Creator name',
     `modifier`        varchar(64)  NULL COMMENT 'Modifier name',
     `create_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
     `modify_time`     timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cluster_set` (`set_name`)
 );
 
 -- ----------------------------
@@ -858,7 +874,8 @@ CREATE TABLE `cluster_set_inlongid`
     `id`              int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `set_name`        varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`, `inlong_group_id`)
 );
 
 -- ----------------------------
@@ -871,7 +888,8 @@ CREATE TABLE `cache_cluster`
     `cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
     `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `zone`         varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cache_cluster` (`cluster_name`)
 );
 
 -- ----------------------------
@@ -885,8 +903,9 @@ CREATE TABLE `cache_cluster_ext`
     `key_name`     varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`    varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`   tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time`  timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_cache_cluster` (`cluster_name`)
 );
 
 -- ----------------------------
@@ -899,7 +918,8 @@ CREATE TABLE `cache_topic`
     `topic_name`    varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore',
     `set_name`      varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `partition_num` int(11)      NOT NULL COMMENT 'Partition number',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_cache_topic` (`topic_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -912,7 +932,8 @@ CREATE TABLE `proxy_cluster`
     `cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
     `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `zone`         varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_proxy_cluster` (`cluster_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -924,7 +945,8 @@ CREATE TABLE `proxy_cluster_to_cache_cluster`
     `id`                 int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
     `cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`, `cache_cluster_name`)
 );
 
 -- ----------------------------
@@ -939,7 +961,8 @@ CREATE TABLE `flume_source`
     `type`          varchar(128) NOT NULL COMMENT 'FlumeSource classname',
     `channels`      varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space',
     `selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_flume_source` (`source_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -954,8 +977,9 @@ CREATE TABLE `flume_source_ext`
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_flume_source_ext` (`parent_name`)
 );
 
 -- ----------------------------
@@ -968,7 +992,8 @@ CREATE TABLE `flume_channel`
     `channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
     `set_name`     varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `type`         varchar(128) NOT NULL COMMENT 'FlumeChannel classname',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_flume_channel` (`channel_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -983,8 +1008,9 @@ CREATE TABLE `flume_channel_ext`
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_flume_channel_ext` (`parent_name`)
 );
 
 -- ----------------------------
@@ -998,7 +1024,8 @@ CREATE TABLE `flume_sink`
     `set_name`  varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
     `type`      varchar(128) NOT NULL COMMENT 'FlumeSink classname',
     `channel`   varchar(128) NOT NULL COMMENT 'FlumeSink channel',
-    PRIMARY KEY (`id`)
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_flume_sink` (`sink_name`, `set_name`)
 );
 
 -- ----------------------------
@@ -1013,8 +1040,9 @@ CREATE TABLE `flume_sink_ext`
     `key_name`    varchar(64)  NOT NULL COMMENT 'Configuration item name',
     `key_value`   varchar(256) NULL COMMENT 'The value of the configuration item',
     `is_deleted`  tinyint(1)            DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
-    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
-    PRIMARY KEY (`id`)
+    `modify_time` timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    PRIMARY KEY (`id`),
+    KEY `index_flume_sink_ext` (`parent_name`)
 );
 
 SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java
index 85fc1e1..e8bd577 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java
@@ -58,7 +58,7 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
         Process process = context.getProcess();
         ProcessForm form = context.getProcessForm();
         if (process.getFormClass() != null) {
-            Preconditions.checkNotNull(form, "form can't be null");
+            Preconditions.checkNotNull(form, "form cannot be null");
             Preconditions.checkTrue(form.getClass().isAssignableFrom(process.getFormClass()),
                     () -> "form type not match, should be class " + process.getFormClass());
             form.validate();
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
index 765eb54..92798cf 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
@@ -69,7 +69,7 @@ public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
     @Override
     public void create(UserTask userTask, WorkflowContext context) {
         List<String> approvers = userTask.getApproverAssign().assign(context);
-        Preconditions.checkNotEmpty(approvers, "can't assign approvers for task: " + userTask.getDisplayName()
+        Preconditions.checkNotEmpty(approvers, "cannot assign approvers for task: " + userTask.getDisplayName()
                 + ", as the approvers in table `wf_approver` was empty");
 
         if (!userTask.isNeedAllApprove()) {
@@ -166,7 +166,7 @@ public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
 
         UserTask userTask = (UserTask) actionContext.getTask();
         if (needForm(userTask, actionContext.getAction())) {
-            Preconditions.checkNotNull(actionContext.getForm(), "form can't be null");
+            Preconditions.checkNotNull(actionContext.getForm(), "form cannot be null");
             Preconditions.checkTrue(actionContext.getForm().getClass().isAssignableFrom(userTask.getFormClass()),
                     () -> "form type not match, should be class " + userTask.getFormClass());
             actionContext.getForm().validate();
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
index 39c6039..30d2412 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
@@ -41,8 +41,8 @@ public class WorkflowFormParserUtils {
      */
     public static <T extends TaskForm> T parseTaskForm(TaskInstance taskInstance, Process process)
             throws FormParseException {
-        Preconditions.checkNotNull(taskInstance, "taskInstance can't be null");
-        Preconditions.checkNotNull(process, "process can't be null");
+        Preconditions.checkNotNull(taskInstance, "taskInstance cannot be null");
+        Preconditions.checkNotNull(process, "process cannot be null");
 
         if (StringUtils.isEmpty(taskInstance.getFormData())) {
             return null;
@@ -67,7 +67,7 @@ public class WorkflowFormParserUtils {
      */
     public static <T extends ProcessForm> T parseProcessForm(String formDate, Process process)
             throws FormParseException {
-        Preconditions.checkNotNull(process, "process can't be null");
+        Preconditions.checkNotNull(process, "process cannot be null");
 
         if (StringUtils.isEmpty(formDate)) {
             return null;