You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/23 06:12:02 UTC

[incubator-inlong] branch master updated: [INLONG-2274][Manager] Supports configuring whether to create a Hive database or table (#2277)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new dbb2f94  [INLONG-2274][Manager] Supports configuring whether to create a Hive database or table (#2277)
dbb2f94 is described below

commit dbb2f9475534eab0b4c4e300948906e2ba7dee9d
Author: healchow <he...@gmail.com>
AuthorDate: Sun Jan 23 14:11:58 2022 +0800

    [INLONG-2274][Manager] Supports configuring whether to create a Hive database or table (#2277)
---
 .../inlong/manager/common/enums/BizConstant.java   |  3 +
 ...ageListVO.java => BaseStorageListResponse.java} |  4 +-
 ...aseStorageInfo.java => BaseStorageRequest.java} | 32 +-------
 ...seStorageInfo.java => BaseStorageResponse.java} | 32 +-------
 ...torageHiveSortInfo.java => StorageHiveDTO.java} |  5 +-
 ...iveListVO.java => StorageHiveListResponse.java} |  4 +-
 ...torageHiveInfo.java => StorageHiveRequest.java} | 19 ++---
 ...orageHiveInfo.java => StorageHiveResponse.java} | 32 ++++++--
 .../{FullPageInfo.java => FullStreamRequest.java}  | 10 +--
 .../{FullPageInfo.java => FullStreamResponse.java} | 10 +--
 .../manager/dao/entity/StorageHiveEntity.java      |  4 +-
 .../dao/mapper/BusinessExtEntityMapper.java        |  2 -
 .../dao/mapper/StorageHiveEntityMapper.java        |  4 +-
 .../resources/mappers/BusinessExtEntityMapper.xml  | 27 -------
 .../resources/mappers/StorageHiveEntityMapper.xml  | 57 +++++++-------
 .../manager/service/core/BusinessService.java      | 11 +++
 .../manager/service/core/DataStreamService.java    | 13 ++--
 .../manager/service/core/StorageService.java       | 19 ++---
 .../service/core/impl/BusinessServiceImpl.java     | 27 ++-----
 .../service/core/impl/DataStreamServiceImpl.java   | 58 +++++++-------
 .../service/core/impl/SourceFileServiceImpl.java   |  2 +-
 .../service/core/impl/StorageHiveOperation.java    | 48 +++++++-----
 .../service/core/impl/StorageServiceImpl.java      | 41 +++++-----
 .../hive/CreateHiveTableForStreamListener.java     | 20 ++---
 .../thirdpart/hive/CreateHiveTableListener.java    | 20 ++---
 .../service/thirdpart/hive/HiveTableOperator.java  | 85 ++++++++++++--------
 .../thirdpart/sort/PushHiveConfigTaskListener.java | 14 ++--
 .../service/core/impl/BusinessServiceImplTest.java | 67 ----------------
 .../manager-web/sql/apache_inlong_manager.sql      | 47 +++++------
 .../web/controller/DataStreamController.java       |  9 ++-
 .../manager/web/controller/StorageController.java  | 13 ++--
 .../src/main/resources/application-dev.properties  |  8 ++
 .../manager/service/core/BusinessServiceTest.java  | 70 ++++++++++++++---
 .../service/core/ConsumptionServiceTest.java       |  2 +-
 .../service/core/DataStorageServiceTest.java       | 91 ++++++++++++++++++++++
 .../service/core/DataStreamServiceTest.java        | 73 +++++++++++++++++
 .../test/resources/sql/apache_inlong_manager.sql   |  1 +
 37 files changed, 556 insertions(+), 428 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index 262fb3b..8de8a2a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -66,4 +66,7 @@ public class BizConstant {
 
     public static final String PREFIX_RLQ = "rlq"; // prefix of the Topic of the retry letter queue
 
+    public static final Integer ENABLE_CREATE_TABLE = 1; // Enable create table
+
+    public static final Integer DISABLE_CREATE_TABLE = 0; // Disable create table
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListVO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListResponse.java
similarity index 95%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListVO.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListResponse.java
index 294391f..80d221d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListVO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListResponse.java
@@ -23,10 +23,10 @@ import java.util.Date;
 import lombok.Data;
 
 /**
- * Paging list of data storage
+ * Response of data storage list
  */
 @Data
-public class BaseStorageListVO {
+public class BaseStorageListResponse {
 
     @ApiModelProperty(value = "Primary key")
     private Integer id;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageRequest.java
similarity index 68%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageRequest.java
index 2832839..ed0b185 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageRequest.java
@@ -17,27 +17,25 @@
 
 package org.apache.inlong.manager.common.pojo.datastorage;
 
-import com.fasterxml.jackson.annotation.JsonFormat;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.BizConstant;
 
 /**
- * Basic data storage information
+ * Basic request of data storage
  */
 @Data
-@ApiModel("Basic data storage information")
+@ApiModel("Basic request of data storage")
 @JsonTypeInfo(use = Id.NAME, visible = true, property = "storageType")
 @JsonSubTypes({
-        @Type(value = StorageHiveInfo.class, name = BizConstant.STORAGE_HIVE)
+        @Type(value = StorageHiveRequest.class, name = BizConstant.STORAGE_HIVE)
 })
-public class BaseStorageInfo {
+public class BaseStorageRequest {
 
     private Integer id;
 
@@ -53,26 +51,4 @@ public class BaseStorageInfo {
     @ApiModelProperty("Data storage period, unit: day")
     private Integer storagePeriod;
 
-    @ApiModelProperty("Status")
-    private Integer status;
-
-    @ApiModelProperty("Previous State")
-    private Integer previousStatus;
-
-    @ApiModelProperty("is deleted? 0: deleted, 1: not deleted")
-    private Integer isDeleted = 0;
-
-    private String creator;
-
-    private String modifier;
-
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
-    private Date createTime;
-
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
-    private Date modifyTime;
-
-    @ApiModelProperty("Temporary view, string in JSON format")
-    private String tempView;
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageResponse.java
similarity index 68%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageResponse.java
index 2832839..649b369 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageResponse.java
@@ -17,27 +17,25 @@
 
 package org.apache.inlong.manager.common.pojo.datastorage;
 
-import com.fasterxml.jackson.annotation.JsonFormat;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
 import lombok.Data;
 import org.apache.inlong.manager.common.enums.BizConstant;
 
 /**
- * Basic data storage information
+ * Basic response of data storage
  */
 @Data
-@ApiModel("Basic data storage information")
+@ApiModel("Basic response of data storage")
 @JsonTypeInfo(use = Id.NAME, visible = true, property = "storageType")
 @JsonSubTypes({
-        @Type(value = StorageHiveInfo.class, name = BizConstant.STORAGE_HIVE)
+        @Type(value = StorageHiveResponse.class, name = BizConstant.STORAGE_HIVE)
 })
-public class BaseStorageInfo {
+public class BaseStorageResponse {
 
     private Integer id;
 
@@ -53,26 +51,4 @@ public class BaseStorageInfo {
     @ApiModelProperty("Data storage period, unit: day")
     private Integer storagePeriod;
 
-    @ApiModelProperty("Status")
-    private Integer status;
-
-    @ApiModelProperty("Previous State")
-    private Integer previousStatus;
-
-    @ApiModelProperty("is deleted? 0: deleted, 1: not deleted")
-    private Integer isDeleted = 0;
-
-    private String creator;
-
-    private String modifier;
-
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
-    private Date createTime;
-
-    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
-    private Date modifyTime;
-
-    @ApiModelProperty("Temporary view, string in JSON format")
-    private String tempView;
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveDTO.java
similarity index 95%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveDTO.java
index bf52872..9906ea0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveDTO.java
@@ -20,14 +20,15 @@ package org.apache.inlong.manager.common.pojo.datastorage;
 import lombok.Data;
 
 /**
- * Hive info for Sort config
+ * Hive info
  */
 @Data
-public class StorageHiveSortInfo {
+public class StorageHiveDTO {
 
     private Integer id;
     private String inlongGroupId;
     private String inlongStreamId;
+    private Integer enableCreateTable;
 
     // Hive server info
     private String jdbcUrl;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListVO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListResponse.java
similarity index 94%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListVO.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListResponse.java
index 9e9d455..78873c6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListVO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListResponse.java
@@ -23,12 +23,12 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 
 /**
- * Response of Hive storage paging list
+ * Response of Hive storage list
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
 @ApiModel("Response of Hive storage paging list")
-public class StorageHiveListVO extends BaseStorageListVO {
+public class StorageHiveListResponse extends BaseStorageListResponse {
 
     @ApiModelProperty("target database name")
     private String dbName;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveRequest.java
similarity index 86%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveRequest.java
index 0c7b3c1..f562f33 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveRequest.java
@@ -26,16 +26,19 @@ import lombok.ToString;
 import org.apache.inlong.manager.common.enums.BizConstant;
 
 /**
- * Interaction objects for Hive storage
+ * Request of the Hive storage info
  */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@ApiModel(value = "Interaction objects for Hive storage")
-public class StorageHiveInfo extends BaseStorageInfo {
+@ApiModel(value = "Request of the Hive storage info")
+public class StorageHiveRequest extends BaseStorageRequest {
 
     private String storageType = BizConstant.STORAGE_HIVE;
 
+    @ApiModelProperty("Whether to enable create table, 1: enable, 0: disable, default is 1")
+    private Integer enableCreateTable = 1;
+
     @ApiModelProperty("Hive JDBC URL")
     private String jdbcUrl;
 
@@ -81,16 +84,10 @@ public class StorageHiveInfo extends BaseStorageInfo {
     @ApiModelProperty("Data field separator")
     private String dataSeparator;
 
-    @ApiModelProperty("Data storage period in Hive, unit: Day")
-    private Integer storagePeriod;
-
-    @ApiModelProperty("Backend operation log")
-    private String optLog;
-
-    @ApiModelProperty("hive table field list")
+    @ApiModelProperty("Hive table field list")
     private List<StorageHiveFieldInfo> hiveFieldList;
 
-    @ApiModelProperty("other ext info list")
+    @ApiModelProperty("Other ext info list")
     private List<StorageExtInfo> extList;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveResponse.java
similarity index 79%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveResponse.java
index 0c7b3c1..4a81f63 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveResponse.java
@@ -17,8 +17,10 @@
 
 package org.apache.inlong.manager.common.pojo.datastorage;
 
+import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import java.util.Date;
 import java.util.List;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
@@ -26,16 +28,19 @@ import lombok.ToString;
 import org.apache.inlong.manager.common.enums.BizConstant;
 
 /**
- * Interaction objects for Hive storage
+ * Response of the Hive storage info
  */
 @Data
 @ToString(callSuper = true)
 @EqualsAndHashCode(callSuper = true)
-@ApiModel(value = "Interaction objects for Hive storage")
-public class StorageHiveInfo extends BaseStorageInfo {
+@ApiModel(value = "Response of the Hive storage info")
+public class StorageHiveResponse extends BaseStorageResponse {
 
     private String storageType = BizConstant.STORAGE_HIVE;
 
+    @ApiModelProperty("Whether to enable create table")
+    private Integer enableCreateTable;
+
     @ApiModelProperty("Hive JDBC URL")
     private String jdbcUrl;
 
@@ -81,12 +86,27 @@ public class StorageHiveInfo extends BaseStorageInfo {
     @ApiModelProperty("Data field separator")
     private String dataSeparator;
 
-    @ApiModelProperty("Data storage period in Hive, unit: Day")
-    private Integer storagePeriod;
-
     @ApiModelProperty("Backend operation log")
     private String optLog;
 
+    @ApiModelProperty("Status")
+    private Integer status;
+
+    @ApiModelProperty("Previous State")
+    private Integer previousStatus;
+
+    @ApiModelProperty("Creator")
+    private String creator;
+
+    @ApiModelProperty("modifier")
+    private String modifier;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date createTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
     @ApiModelProperty("hive table field list")
     private List<StorageHiveFieldInfo> hiveFieldList;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamRequest.java
similarity index 88%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamRequest.java
index cf4c4fc..2e31cfb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamRequest.java
@@ -25,14 +25,14 @@ import org.apache.inlong.manager.common.pojo.datasource.SourceDbBasicInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceDbDetailInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceFileBasicInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceFileDetailInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
 
 /**
- * All information on the data stream page, including data stream, data source, and data storage
+ * All request info on the data stream page, including data stream, data source, and data storage
  */
 @Data
-@ApiModel("All information of the data flow page")
-public class FullPageInfo {
+@ApiModel("All request info on the data stream page")
+public class FullStreamRequest {
 
     @ApiModelProperty("Data stream information")
     private DataStreamInfo streamInfo;
@@ -50,6 +50,6 @@ public class FullPageInfo {
     private List<SourceDbDetailInfo> dbDetailInfoList;
 
     @ApiModelProperty("Data storage information")
-    private List<BaseStorageInfo> storageInfo;
+    private List<BaseStorageRequest> storageInfo;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamResponse.java
similarity index 88%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamResponse.java
index cf4c4fc..26e742d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamResponse.java
@@ -25,14 +25,14 @@ import org.apache.inlong.manager.common.pojo.datasource.SourceDbBasicInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceDbDetailInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceFileBasicInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceFileDetailInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
 
 /**
- * All information on the data stream page, including data stream, data source, and data storage
+ * All response info on the data stream page, including data stream, data source, and data storage
  */
 @Data
-@ApiModel("All information of the data flow page")
-public class FullPageInfo {
+@ApiModel("All response info on the data stream page")
+public class FullStreamResponse {
 
     @ApiModelProperty("Data stream information")
     private DataStreamInfo streamInfo;
@@ -50,6 +50,6 @@ public class FullPageInfo {
     private List<SourceDbDetailInfo> dbDetailInfoList;
 
     @ApiModelProperty("Data storage information")
-    private List<BaseStorageInfo> storageInfo;
+    private List<BaseStorageResponse> storageInfo;
 
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
index c6e513e..d3b3542 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
@@ -28,6 +28,9 @@ public class StorageHiveEntity implements Serializable {
     private Integer id;
     private String inlongGroupId;
     private String inlongStreamId;
+
+    private Integer enableCreateTable;
+
     private String jdbcUrl;
     private String username;
     private String password;
@@ -55,6 +58,5 @@ public class StorageHiveEntity implements Serializable {
     private String modifier;
     private Date createTime;
     private Date modifyTime;
-    private String tempView;
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java
index f5385f7..fa7b5bf 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java
@@ -35,8 +35,6 @@ public interface BusinessExtEntityMapper {
 
     List<BusinessExtEntity> selectByGroupId(String groupId);
 
-    int updateByPrimaryKeySelective(BusinessExtEntity record);
-
     int updateByPrimaryKey(BusinessExtEntity record);
 
     BusinessExtEntity selectByGroupIdAndKeyName(String groupId, String keyName);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
index fea4cd5..c30e712 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.dao.mapper;
 
 import java.util.List;
 import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
 import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
 import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
@@ -90,7 +90,7 @@ public interface StorageHiveEntityMapper {
      * @param streamId Data stream id, if is null, get all configs under the group id
      * @return Hive Sort config
      */
-    List<StorageHiveSortInfo> selectHiveSortInfoByIdentifier(@Param("groupId") String groupId,
+    List<StorageHiveDTO> selectAllHiveConfig(@Param("groupId") String groupId,
             @Param("streamId") String streamId);
 
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml
index 322ef11..4558507 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml
@@ -120,34 +120,7 @@
         <foreach collection="extList" separator="," index="index" item="item">
             (#{item.id}, #{item.inlongGroupId}, #{item.keyName}, #{item.keyValue}, #{item.isDeleted})
         </foreach>
-        ON DUPLICATE KEY UPDATE
-        id = values(id),
-        inlong_group_id = values(inlong_group_id),
-        key_name = values(key_name),
-        key_value = values(key_value),
-        is_deleted = values(is_deleted)
     </insert>
-    <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.BusinessExtEntity">
-        update business_ext
-        <set>
-            <if test="inlongGroupId != null">
-                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-            </if>
-            <if test="keyName != null">
-                key_name = #{keyName,jdbcType=VARCHAR},
-            </if>
-            <if test="keyValue != null">
-                key_value = #{keyValue,jdbcType=VARCHAR},
-            </if>
-            <if test="isDeleted != null">
-                is_deleted = #{isDeleted,jdbcType=INTEGER},
-            </if>
-            <if test="modifyTime != null">
-                modify_time = #{modifyTime,jdbcType=TIMESTAMP},
-            </if>
-        </set>
-        where id = #{id,jdbcType=INTEGER}
-    </update>
     <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.BusinessExtEntity">
         update business_ext
         set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
index 52c3178..eb40ef9 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
@@ -24,6 +24,7 @@
         <id column="id" jdbcType="INTEGER" property="id"/>
         <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
         <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+        <result column="enable_create_table" jdbcType="TINYINT" property="enableCreateTable"/>
         <result column="jdbc_url" jdbcType="VARCHAR" property="jdbcUrl"/>
         <result column="username" jdbcType="VARCHAR" property="username"/>
         <result column="password" jdbcType="VARCHAR" property="password"/>
@@ -51,14 +52,13 @@
         <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
         <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
         <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
-        <result column="temp_view" jdbcType="LONGVARCHAR" property="tempView"/>
     </resultMap>
 
     <sql id="Base_Column_List">
-        id, inlong_group_id, inlong_stream_id, jdbc_url, username, password, db_name, table_name,
+        id, inlong_group_id, inlong_stream_id, enable_create_table, jdbc_url, username, password, db_name, table_name,
         hdfs_default_fs, warehouse_dir, partition_interval, partition_unit, primary_partition, secondary_partition,
-        partition_creation_strategy, file_format, data_encoding, data_separator, storage_period, opt_log,
-        status, previous_status, is_deleted, creator, modifier, create_time, modify_time, temp_view
+        partition_creation_strategy, file_format, data_encoding, data_separator, storage_period,
+        opt_log, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
     </sql>
 
     <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -141,9 +141,8 @@
           and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
           and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
     </select>
-    <select id="selectHiveSortInfoByIdentifier"
-            resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo">
-        SELECT hive.id,
+    <select id="selectAllHiveConfig" resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO">
+        select hive.id,
                hive.inlong_group_id,
                hive.inlong_stream_id,
 
@@ -173,7 +172,7 @@
                stream.description,
                stream.data_separator as sourceSeparator,
                stream.data_escape_char
-        FROM data_stream stream,
+        from data_stream stream,
              storage_hive hive
         <where>
             stream.is_deleted = 0
@@ -195,28 +194,28 @@
 
     <insert id="insert" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
-        insert into storage_hive (id, inlong_group_id, inlong_stream_id,
+        insert into storage_hive (id, inlong_group_id,
+                                  inlong_stream_id, enable_create_table,
                                   jdbc_url, username, password,
                                   db_name, table_name, hdfs_default_fs,
                                   warehouse_dir, partition_interval,
                                   partition_unit, primary_partition,
                                   secondary_partition, partition_creation_strategy,
                                   file_format, data_encoding, data_separator,
-                                  storage_period, opt_log,
-                                  status, previous_status, is_deleted,
-                                  creator, modifier, create_time,
-                                  modify_time, temp_view)
-        values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+                                  storage_period, opt_log, status,
+                                  previous_status, is_deleted, creator,
+                                  modifier, create_time, modify_time)
+        values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
+                #{inlongStreamId,jdbcType=VARCHAR}, #{enableCreateTable,jdbcType=TINYINT},
                 #{jdbcUrl,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR},
                 #{dbName,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{hdfsDefaultFs,jdbcType=VARCHAR},
                 #{warehouseDir,jdbcType=VARCHAR}, #{partitionInterval,jdbcType=INTEGER},
                 #{partitionUnit,jdbcType=VARCHAR}, #{primaryPartition,jdbcType=VARCHAR},
                 #{secondaryPartition,jdbcType=VARCHAR}, #{partitionCreationStrategy,jdbcType=VARCHAR},
                 #{fileFormat,jdbcType=VARCHAR}, #{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
-                #{storagePeriod,jdbcType=INTEGER}, #{optLog,jdbcType=VARCHAR},
-                #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
-                #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP},
-                #{modifyTime,jdbcType=TIMESTAMP}, #{tempView,jdbcType=LONGVARCHAR})
+                #{storagePeriod,jdbcType=INTEGER}, #{optLog,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
+                #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
+                #{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
     </insert>
     <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
             parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
@@ -231,6 +230,9 @@
             <if test="inlongStreamId != null">
                 inlong_stream_id,
             </if>
+            <if test="enableCreateTable != null">
+                enable_create_table,
+            </if>
             <if test="jdbcUrl != null">
                 jdbc_url,
             </if>
@@ -303,9 +305,6 @@
             <if test="modifyTime != null">
                 modify_time,
             </if>
-            <if test="tempView != null">
-                temp_view,
-            </if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="id != null">
@@ -317,6 +316,9 @@
             <if test="inlongStreamId != null">
                 #{inlongStreamId,jdbcType=VARCHAR},
             </if>
+            <if test="enableCreateTable != null">
+                #{enableCreateTable,jdbcType=TINYINT},
+            </if>
             <if test="jdbcUrl != null">
                 #{jdbcUrl,jdbcType=VARCHAR},
             </if>
@@ -389,9 +391,6 @@
             <if test="modifyTime != null">
                 #{modifyTime,jdbcType=TIMESTAMP},
             </if>
-            <if test="tempView != null">
-                #{tempView,jdbcType=LONGVARCHAR},
-            </if>
         </trim>
     </insert>
 
@@ -404,6 +403,9 @@
             <if test="inlongStreamId != null">
                 inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
             </if>
+            <if test="enableCreateTable != null">
+                enable_create_table = #{enableCreateTable,jdbcType=TINYINT},
+            </if>
             <if test="jdbcUrl != null">
                 jdbc_url = #{jdbcUrl,jdbcType=VARCHAR},
             </if>
@@ -476,9 +478,6 @@
             <if test="modifyTime != null">
                 modify_time = #{modifyTime,jdbcType=TIMESTAMP},
             </if>
-            <if test="tempView != null">
-                temp_view = #{tempView,jdbcType=LONGVARCHAR},
-            </if>
         </set>
         where id = #{id,jdbcType=INTEGER}
     </update>
@@ -486,6 +485,7 @@
         update storage_hive
         set inlong_group_id             = #{inlongGroupId,jdbcType=VARCHAR},
             inlong_stream_id            = #{inlongStreamId,jdbcType=VARCHAR},
+            enable_create_table         = #{enableCreateTable,jdbcType=TINYINT},
             jdbc_url                    = #{jdbcUrl,jdbcType=VARCHAR},
             username                    = #{username,jdbcType=VARCHAR},
             password                    = #{password,jdbcType=VARCHAR},
@@ -509,8 +509,7 @@
             creator                     = #{creator,jdbcType=VARCHAR},
             modifier                    = #{modifier,jdbcType=VARCHAR},
             create_time                 = #{createTime,jdbcType=TIMESTAMP},
-            modify_time                 = #{modifyTime,jdbcType=TIMESTAMP},
-            temp_view                   = #{tempView,jdbcType=LONGVARCHAR}
+            modify_time                 = #{modifyTime,jdbcType=TIMESTAMP}
         where id = #{id,jdbcType=INTEGER}
     </update>
     <update id="updateStorageStatusById" parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
index a37e3d7..7f61d58 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
@@ -18,8 +18,10 @@
 package org.apache.inlong.manager.service.core;
 
 import com.github.pagehelper.PageInfo;
+import java.util.List;
 import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessCountVO;
+import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessListVO;
 import org.apache.inlong.manager.common.pojo.business.BusinessPageRequest;
@@ -117,4 +119,13 @@ public interface BusinessService {
      */
     boolean updateAfterApprove(BusinessApproveInfo approveInfo, String operator);
 
+    /**
+     * Save or update extended information
+     * <p/>First physically delete the existing extended information, and then add this batch of extended information
+     *
+     * @param groupId Group id
+     * @param infoList Ext info list
+     */
+    void saveOrUpdateExt(String groupId, List<BusinessExtInfo> infoList);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index 1221742..8070d0f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -25,8 +25,9 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
-import org.apache.inlong.manager.common.pojo.datastream.FullPageInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamRequest;
 import org.apache.inlong.manager.common.pojo.datastream.FullPageUpdateInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamResponse;
 
 /**
  * data stream service layer interface
@@ -109,22 +110,22 @@ public interface DataStreamService {
     /**
      * Save all information related to the data stream, its data source, and data storage
      *
-     * @param fullPageInfo All information on the page
+     * @param fullStreamRequest All information on the page
      * @param operator Edit person's name
      * @return Whether the save was successful
      */
-    boolean saveAll(FullPageInfo fullPageInfo, String operator);
+    boolean saveAll(FullStreamRequest fullStreamRequest, String operator);
 
     /**
      * Save data streams, their data sources, and all information related to data storage in batches
      *
-     * @param fullPageInfoList List of data stream page information
+     * @param fullStreamRequestList List of data stream page information
      * @param operator Edit person's name
      * @return Whether the save was successful
      * @apiNote This interface is only used when creating a new business. To ensure data consistency,
      *         all associated data needs to be physically deleted, and then added
      */
-    boolean batchSaveAll(List<FullPageInfo> fullPageInfoList, String operator);
+    boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator);
 
     /**
      * Paging query all data of the data stream page under the specified groupId
@@ -132,7 +133,7 @@ public interface DataStreamService {
      * @param request Query
      * @return Paging list of all data on the data stream page
      */
-    PageInfo<FullPageInfo> listAllWithGroupId(DataStreamPageRequest request);
+    PageInfo<FullStreamResponse> listAllWithGroupId(DataStreamPageRequest request);
 
     /**
      * Modify all data streams (including basic information about data sources)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java
index 4ec09bb..6b13df9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java
@@ -19,8 +19,9 @@ package org.apache.inlong.manager.service.core;
 
 import com.github.pagehelper.PageInfo;
 import java.util.List;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageApproveInfo;
 import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
@@ -37,7 +38,7 @@ public interface StorageService {
      * @param operator Edit person's name
      * @return Primary key after saving
      */
-    Integer save(BaseStorageInfo storageInfo, String operator);
+    Integer save(BaseStorageRequest storageInfo, String operator);
 
     /**
      * Query storage information based on id
@@ -46,7 +47,7 @@ public interface StorageService {
      * @param storageType Storage type
      * @return Store information
      */
-    BaseStorageInfo getById(String storageType, Integer id);
+    BaseStorageResponse getById(String storageType, Integer id);
 
     /**
      * Query storage information based on business group id and data stream id
@@ -56,7 +57,7 @@ public interface StorageService {
      * @return Store information list
      * @apiNote Storage types only support temporarily: HIVE
      */
-    List<BaseStorageInfo> listByIdentifier(String groupId, String streamId);
+    List<BaseStorageResponse> listByIdentifier(String groupId, String streamId);
 
     /**
      * Query stored summary information based on business group id and data stream id, including storage cluster
@@ -75,7 +76,7 @@ public interface StorageService {
      * @param streamId Data stream id
      * @return Number of stored information
      */
-    int getCountByIdentifier(String groupId, String streamId);
+    Integer getCountByIdentifier(String groupId, String streamId);
 
     /**
      * Paging query storage information based on conditions
@@ -83,16 +84,16 @@ public interface StorageService {
      * @param request Paging request
      * @return Store information list
      */
-    PageInfo<? extends BaseStorageListVO> listByCondition(StoragePageRequest request);
+    PageInfo<? extends BaseStorageListResponse> listByCondition(StoragePageRequest request);
 
     /**
      * Modify data storage information
      *
-     * @param storageInfo Information that needs to be modified
+     * @param storageRequest Information that needs to be modified
      * @param operator Edit person's name
      * @return whether succeed
      */
-    boolean update(BaseStorageInfo storageInfo, String operator);
+    boolean update(BaseStorageRequest storageRequest, String operator);
 
     /**
      * Delete data storage information based on id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 074c9df..573d76d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -104,7 +104,7 @@ public class BusinessServiceImpl implements BusinessService {
         entity.setModifier(operator);
         entity.setCreateTime(new Date());
         businessMapper.insertSelective(entity);
-        this.saveExt(groupId, businessInfo.getExtList());
+        this.saveOrUpdateExt(groupId, businessInfo.getExtList());
 
         if (BizConstant.MIDDLEWARE_PULSAR.equals(businessInfo.getMiddlewareType())) {
             BusinessPulsarInfo pulsarInfo = (BusinessPulsarInfo) businessInfo.getMqExtInfo();
@@ -223,7 +223,7 @@ public class BusinessServiceImpl implements BusinessService {
         businessMapper.updateByIdentifierSelective(entity);
 
         // Save extended information
-        this.updateExt(groupId, businessInfo.getExtList());
+        this.saveOrUpdateExt(groupId, businessInfo.getExtList());
 
         // Update the Pulsar info
         if (BizConstant.MIDDLEWARE_PULSAR.equals(businessInfo.getMiddlewareType())) {
@@ -420,28 +420,16 @@ public class BusinessServiceImpl implements BusinessService {
         return true;
     }
 
-    /**
-     * Update extended information
-     * <p/>First physically delete the existing extended information, and then add this batch of extended information
-     */
     @Transactional(rollbackFor = Throwable.class)
-    void updateExt(String groupId, List<BusinessExtInfo> extInfoList) {
-        LOGGER.debug("begin to update business ext, groupId={}, ext={}", groupId, extInfoList);
-        try {
-            businessExtMapper.deleteAllByGroupId(groupId);
-            saveExt(groupId, extInfoList);
-            LOGGER.info("success to update business ext");
-        } catch (Exception e) {
-            LOGGER.error("failed to update business ext: ", e);
-            throw new BusinessException(BizErrorCodeEnum.BUSINESS_SAVE_FAILED);
-        }
-    }
+    @Override
+    public void saveOrUpdateExt(String groupId, List<BusinessExtInfo> infoList) {
+        LOGGER.debug("begin to save or update business ext info, groupId={}, ext={}", groupId, infoList);
+        businessExtMapper.deleteAllByGroupId(groupId);
 
-    @Transactional(rollbackFor = Throwable.class)
-    void saveExt(String groupId, List<BusinessExtInfo> infoList) {
         if (CollectionUtils.isEmpty(infoList)) {
             return;
         }
+
         List<BusinessExtEntity> entityList = CommonBeanUtils.copyListProperties(infoList, BusinessExtEntity::new);
         Date date = new Date();
         for (BusinessExtEntity entity : entityList) {
@@ -449,6 +437,7 @@ public class BusinessServiceImpl implements BusinessService {
             entity.setModifyTime(date);
         }
         businessExtMapper.insertAll(entityList);
+        LOGGER.info("success to save or update business ext for groupId={}", groupId);
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index e81587d..9713e48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -36,7 +36,8 @@ import org.apache.inlong.manager.common.pojo.datasource.SourceDbBasicInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceDbDetailInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceFileBasicInfo;
 import org.apache.inlong.manager.common.pojo.datasource.SourceFileDetailInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamExtInfo;
@@ -46,8 +47,9 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
-import org.apache.inlong.manager.common.pojo.datastream.FullPageInfo;
 import org.apache.inlong.manager.common.pojo.datastream.FullPageUpdateInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamRequest;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamResponse;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.BusinessEntity;
@@ -330,7 +332,7 @@ public class DataStreamServiceImpl implements DataStreamService {
      * According to groupId and streamId, query the number of associated undeleted data storage
      */
     private boolean hasDataStorage(String groupId, String streamId) {
-        int count = storageService.getCountByIdentifier(groupId, streamId);
+        Integer count = storageService.getCountByIdentifier(groupId, streamId);
         return count > 0;
     }
 
@@ -373,12 +375,12 @@ public class DataStreamServiceImpl implements DataStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean saveAll(FullPageInfo fullPageInfo, String operator) {
+    public boolean saveAll(FullStreamRequest fullStreamRequest, String operator) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to save all stream page info: {}", fullPageInfo);
+            LOGGER.debug("begin to save all stream page info: {}", fullStreamRequest);
         }
-        Preconditions.checkNotNull(fullPageInfo, "fullPageInfo is empty");
-        DataStreamInfo streamInfo = fullPageInfo.getStreamInfo();
+        Preconditions.checkNotNull(fullStreamRequest, "fullStreamRequest is empty");
+        DataStreamInfo streamInfo = fullStreamRequest.getStreamInfo();
         Preconditions.checkNotNull(streamInfo, "data stream info is empty");
 
         // Check whether it can be added: check by lower-level specific services
@@ -388,28 +390,28 @@ public class DataStreamServiceImpl implements DataStreamService {
         this.save(streamInfo, operator);
 
         // 2.1 Save file data source information
-        if (fullPageInfo.getFileBasicInfo() != null) {
-            sourceFileService.saveBasic(fullPageInfo.getFileBasicInfo(), operator);
+        if (fullStreamRequest.getFileBasicInfo() != null) {
+            sourceFileService.saveBasic(fullStreamRequest.getFileBasicInfo(), operator);
         }
-        if (CollectionUtils.isNotEmpty(fullPageInfo.getFileDetailInfoList())) {
-            for (SourceFileDetailInfo detailInfo : fullPageInfo.getFileDetailInfoList()) {
+        if (CollectionUtils.isNotEmpty(fullStreamRequest.getFileDetailInfoList())) {
+            for (SourceFileDetailInfo detailInfo : fullStreamRequest.getFileDetailInfoList()) {
                 sourceFileService.saveDetail(detailInfo, operator);
             }
         }
 
         // 2.2 Save DB data source information
-        if (fullPageInfo.getDbBasicInfo() != null) {
-            sourceDbService.saveBasic(fullPageInfo.getDbBasicInfo(), operator);
+        if (fullStreamRequest.getDbBasicInfo() != null) {
+            sourceDbService.saveBasic(fullStreamRequest.getDbBasicInfo(), operator);
         }
-        if (CollectionUtils.isNotEmpty(fullPageInfo.getDbDetailInfoList())) {
-            for (SourceDbDetailInfo detailInfo : fullPageInfo.getDbDetailInfoList()) {
+        if (CollectionUtils.isNotEmpty(fullStreamRequest.getDbDetailInfoList())) {
+            for (SourceDbDetailInfo detailInfo : fullStreamRequest.getDbDetailInfoList()) {
                 sourceDbService.saveDetail(detailInfo, operator);
             }
         }
 
         // 3. Save data storage information
-        if (CollectionUtils.isNotEmpty(fullPageInfo.getStorageInfo())) {
-            for (BaseStorageInfo storageInfo : fullPageInfo.getStorageInfo()) {
+        if (CollectionUtils.isNotEmpty(fullStreamRequest.getStorageInfo())) {
+            for (BaseStorageRequest storageInfo : fullStreamRequest.getStorageInfo()) {
                 storageService.save(storageInfo, operator);
             }
         }
@@ -420,14 +422,14 @@ public class DataStreamServiceImpl implements DataStreamService {
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean batchSaveAll(List<FullPageInfo> fullPageInfoList, String operator) {
-        if (CollectionUtils.isEmpty(fullPageInfoList)) {
+    public boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator) {
+        if (CollectionUtils.isEmpty(fullStreamRequestList)) {
             return true;
         }
-        LOGGER.info("begin to batch save all stream page info, batch size={}", fullPageInfoList.size());
+        LOGGER.info("begin to batch save all stream page info, batch size={}", fullStreamRequestList.size());
 
         // Check if it can be added
-        DataStreamInfo firstStream = fullPageInfoList.get(0).getStreamInfo();
+        DataStreamInfo firstStream = fullStreamRequestList.get(0).getStreamInfo();
         Preconditions.checkNotNull(firstStream, "data stream info is empty");
         String groupId = firstStream.getInlongGroupId();
         this.checkBizIsTempStatus(groupId);
@@ -438,7 +440,7 @@ public class DataStreamServiceImpl implements DataStreamService {
         // and the ones with is_deleted=0 should be deleted
         streamMapper.deleteAllByGroupId(groupId);
 
-        for (FullPageInfo pageInfo : fullPageInfoList) {
+        for (FullStreamRequest pageInfo : fullStreamRequestList) {
             // 1.1 Delete the data stream extensions and fields corresponding to groupId and streamId
             DataStreamInfo streamInfo = pageInfo.getStreamInfo();
             String streamId = streamInfo.getInlongStreamId();
@@ -461,7 +463,7 @@ public class DataStreamServiceImpl implements DataStreamService {
     }
 
     @Override
-    public PageInfo<FullPageInfo> listAllWithGroupId(DataStreamPageRequest request) {
+    public PageInfo<FullStreamResponse> listAllWithGroupId(DataStreamPageRequest request) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("begin to list full data stream page by {}", request);
         }
@@ -483,14 +485,14 @@ public class DataStreamServiceImpl implements DataStreamService {
         List<DataStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, DataStreamInfo::new);
 
         // Convert and encapsulate the paged results
-        List<FullPageInfo> fullPageInfoList = new ArrayList<>(streamInfoList.size());
+        List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
         for (DataStreamInfo streamInfo : streamInfoList) {
             // 2.1 Set the extended information and field information of the data stream
             String streamId = streamInfo.getInlongStreamId();
             setStreamExtAndField(groupId, streamId, streamInfo);
 
             // 2.3 Set the data stream to the result sub-object
-            FullPageInfo pageInfo = new FullPageInfo();
+            FullStreamResponse pageInfo = new FullStreamResponse();
             pageInfo.setStreamInfo(streamInfo);
 
             // 3. Query the basic and detailed information of the data source
@@ -520,14 +522,14 @@ public class DataStreamServiceImpl implements DataStreamService {
             }
 
             // 4. Query various data storage and its extended information, field information
-            List<BaseStorageInfo> storageInfoList = storageService.listByIdentifier(groupId, streamId);
+            List<BaseStorageResponse> storageInfoList = storageService.listByIdentifier(groupId, streamId);
             pageInfo.setStorageInfo(storageInfoList);
 
             // 5. Add a single result to the paginated list
-            fullPageInfoList.add(pageInfo);
+            responseList.add(pageInfo);
         }
 
-        PageInfo<FullPageInfo> pageInfo = new PageInfo<>(fullPageInfoList);
+        PageInfo<FullStreamResponse> pageInfo = new PageInfo<>(responseList);
         pageInfo.setTotal(pageInfo.getTotal());
 
         LOGGER.info("success to list full data stream info");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
index 12ee4bb..cada803 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
@@ -229,7 +229,7 @@ public class SourceFileServiceImpl implements SourceFileService {
 
         List<SourceFileDetailEntity> entities = fileDetailMapper.selectByIdentifier(groupId, streamId);
         if (CollectionUtils.isEmpty(entities)) {
-            LOGGER.error("file data source detail not found");
+            LOGGER.warn("file data source detail not found");
             // throw new BusinessException(BizErrorCodeEnum.DATA_SOURCE_DETAIL_NOTFOUND);
             return Collections.emptyList();
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
index 1a8fb24..e5cc1bd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
@@ -29,11 +29,13 @@ import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageExtInfo;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveFieldInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -71,10 +73,10 @@ public class StorageHiveOperation extends StorageBaseOperation {
     /**
      * Save HIVE storage information
      *
-     * @param storageInfo Storage i formation
+     * @param storageInfo Storage information
      * @return Id after saving
      */
-    public int saveHiveStorage(BaseStorageInfo storageInfo, String operator) {
+    public int saveHiveStorage(BaseStorageRequest storageInfo, String operator) {
         String groupId = storageInfo.getInlongGroupId();
         // Make sure that there is no HIVE storage information under the current groupId and streamId
         // (the two are mutually exclusive, only one can exist)
@@ -82,11 +84,14 @@ public class StorageHiveOperation extends StorageBaseOperation {
                 .selectByIdentifier(groupId, storageInfo.getInlongStreamId());
         Preconditions.checkEmpty(storageExist, "HIVE storage already exist under the groupId and streamId");
 
-        StorageHiveInfo hiveInfo = (StorageHiveInfo) storageInfo;
+        StorageHiveRequest hiveInfo = (StorageHiveRequest) storageInfo;
         StorageHiveEntity entity = CommonBeanUtils.copyProperties(hiveInfo, StorageHiveEntity::new);
 
         // Set the encoding type and field splitter
         DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, entity.getInlongStreamId());
+        if (streamEntity == null) {
+            throw new BusinessException(BizErrorCodeEnum.DATA_STREAM_NOT_FOUND);
+        }
         String dataEncoding = streamEntity.getDataEncoding() == null
                 ? StandardCharsets.UTF_8.displayName() : streamEntity.getDataEncoding();
         entity.setDataEncoding(dataEncoding);
@@ -97,7 +102,9 @@ public class StorageHiveOperation extends StorageBaseOperation {
         entity.setStatus(EntityStatus.DATA_STORAGE_NEW.getCode());
         entity.setCreator(operator);
         entity.setModifier(operator);
-        entity.setCreateTime(new Date());
+        Date now = new Date();
+        entity.setCreateTime(now);
+        entity.setCreateTime(now);
         hiveStorageMapper.insertSelective(entity);
 
         int id = entity.getId();
@@ -113,7 +120,7 @@ public class StorageHiveOperation extends StorageBaseOperation {
     /**
      * According to groupId and streamId, query the HIVE storage information to which it belongs
      */
-    public void setHiveStorageInfo(String groupId, String streamId, List<BaseStorageInfo> storageInfoList) {
+    public void setHiveStorageResponse(String groupId, String streamId, List<BaseStorageResponse> requestList) {
         List<StorageHiveEntity> hiveEntities = hiveStorageMapper.selectByIdentifier(groupId, streamId);
 
         if (CollectionUtils.isEmpty(hiveEntities)) {
@@ -134,11 +141,11 @@ public class StorageHiveOperation extends StorageBaseOperation {
             List<StorageHiveFieldInfo> fieldInfoList = CommonBeanUtils
                     .copyListProperties(fieldEntityList, StorageHiveFieldInfo::new);
 
-            StorageHiveInfo hiveInfo = CommonBeanUtils.copyProperties(hiveEntity, StorageHiveInfo::new);
+            StorageHiveResponse hiveInfo = CommonBeanUtils.copyProperties(hiveEntity, StorageHiveResponse::new);
             hiveInfo.setStorageType(storageType);
             hiveInfo.setExtList(CommonBeanUtils.copyListProperties(extEntities, StorageExtInfo::new));
             hiveInfo.setHiveFieldList(fieldInfoList);
-            storageInfoList.add(hiveInfo);
+            requestList.add(hiveInfo);
         }
     }
 
@@ -224,24 +231,24 @@ public class StorageHiveOperation extends StorageBaseOperation {
      * @param id Storage ID
      * @return Storage information
      */
-    public BaseStorageInfo getHiveStorage(Integer id) {
+    public BaseStorageResponse getHiveStorage(Integer id) {
         StorageHiveEntity entity = hiveStorageMapper.selectByPrimaryKey(id);
         if (entity == null) {
             LOGGER.error("hive storage not found by id={}", id);
             return null;
         }
 
-        StorageHiveInfo hiveInfo = CommonBeanUtils.copyProperties(entity, StorageHiveInfo::new);
+        StorageHiveResponse response = CommonBeanUtils.copyProperties(entity, StorageHiveResponse::new);
         String storageType = BizConstant.STORAGE_HIVE;
         List<StorageExtEntity> extEntityList = storageExtMapper.selectByStorageTypeAndId(storageType, id);
         List<StorageExtInfo> extInfoList = CommonBeanUtils.copyListProperties(extEntityList, StorageExtInfo::new);
-        hiveInfo.setExtList(extInfoList);
+        response.setExtList(extInfoList);
 
         List<StorageHiveFieldEntity> entities = hiveFieldMapper.selectByStorageId(id);
         List<StorageHiveFieldInfo> infos = CommonBeanUtils.copyListProperties(entities, StorageHiveFieldInfo::new);
-        hiveInfo.setHiveFieldList(infos);
+        response.setHiveFieldList(infos);
 
-        return hiveInfo;
+        return response;
     }
 
     /**
@@ -250,15 +257,16 @@ public class StorageHiveOperation extends StorageBaseOperation {
      * @param request Query conditions
      * @return Store the paged results of the list
      */
-    public PageInfo<StorageHiveListVO> getHiveStorageList(StoragePageRequest request) {
+    public PageInfo<StorageHiveListResponse> getHiveStorageList(StoragePageRequest request) {
         LOGGER.info("begin to list hive storage page by {}", request);
 
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<StorageHiveEntity> entityPage = (Page<StorageHiveEntity>) hiveStorageMapper.selectByCondition(request);
-        List<StorageHiveListVO> detailList = CommonBeanUtils.copyListProperties(entityPage, StorageHiveListVO::new);
+        List<StorageHiveListResponse> detailList = CommonBeanUtils.copyListProperties(entityPage,
+                StorageHiveListResponse::new);
 
         // Encapsulate the paging query results into the PageInfo object to obtain related paging information
-        PageInfo<StorageHiveListVO> page = new PageInfo<>(detailList);
+        PageInfo<StorageHiveListResponse> page = new PageInfo<>(detailList);
         page.setTotal(entityPage.getTotal());
 
         LOGGER.info("success to list hive storage");
@@ -273,8 +281,8 @@ public class StorageHiveOperation extends StorageBaseOperation {
      * @param operator Operator
      * @return Updated id
      */
-    public Integer updateHiveStorage(Integer bizStatus, BaseStorageInfo storageInfo, String operator) {
-        StorageHiveInfo hiveInfo = (StorageHiveInfo) storageInfo;
+    public Integer updateHiveStorage(Integer bizStatus, BaseStorageRequest storageInfo, String operator) {
+        StorageHiveRequest hiveInfo = (StorageHiveRequest) storageInfo;
         // id exists, update, otherwise add
         Integer id = hiveInfo.getId();
         if (id != null) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
index 3464294..fca8ce2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
@@ -27,8 +27,9 @@ import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageApproveInfo;
 import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
 import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
@@ -59,7 +60,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public Integer save(BaseStorageInfo storageInfo, String operator) {
+    public Integer save(BaseStorageRequest storageInfo, String operator) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("begin to save storage info={}", storageInfo);
         }
@@ -96,12 +97,12 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
     }
 
     @Override
-    public BaseStorageInfo getById(String storageType, Integer id) {
+    public BaseStorageResponse getById(String storageType, Integer id) {
         LOGGER.debug("begin to get storage by storageType={}, id={}", storageType, id);
         Preconditions.checkNotNull(id, "storage id is null");
         Preconditions.checkNotNull(storageType, "storageType is empty");
 
-        BaseStorageInfo storageInfo;
+        BaseStorageResponse storageInfo;
         if (BizConstant.STORAGE_HIVE.equals(storageType.toUpperCase(Locale.ROOT))) {
             storageInfo = hiveOperation.getHiveStorage(id);
         } else {
@@ -114,28 +115,28 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
     }
 
     @Override
-    public int getCountByIdentifier(String groupId, String streamId) {
+    public Integer getCountByIdentifier(String groupId, String streamId) {
         LOGGER.debug("begin to get storage count by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
         Preconditions.checkNotNull(streamId, BizConstant.STREAM_ID_IS_EMPTY);
 
-        int count = hiveStorageMapper.selectCountByIdentifier(groupId, streamId);
+        Integer count = hiveStorageMapper.selectCountByIdentifier(groupId, streamId);
 
         LOGGER.info("the storage count={} by groupId={}, streamId={}", count, groupId, streamId);
         return count;
     }
 
     @Override
-    public List<BaseStorageInfo> listByIdentifier(String groupId, String streamId) {
+    public List<BaseStorageResponse> listByIdentifier(String groupId, String streamId) {
         LOGGER.debug("begin to list storage by groupId={}, streamId={}", groupId, streamId);
         Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
 
         // Query HDFS, HIVE, ES storage information and encapsulate it in the result set
-        List<BaseStorageInfo> storageInfoList = new ArrayList<>();
-        hiveOperation.setHiveStorageInfo(groupId, streamId, storageInfoList);
+        List<BaseStorageResponse> responseList = new ArrayList<>();
+        hiveOperation.setHiveStorageResponse(groupId, streamId, responseList);
 
         LOGGER.info("success to list storage info");
-        return storageInfoList;
+        return responseList;
     }
 
     @Override
@@ -155,7 +156,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
     }
 
     @Override
-    public PageInfo<? extends BaseStorageListVO> listByCondition(StoragePageRequest request) {
+    public PageInfo<? extends BaseStorageListResponse> listByCondition(StoragePageRequest request) {
         if (LOGGER.isDebugEnabled()) {
             LOGGER.debug("begin to list storage page by {}", request);
         }
@@ -164,7 +165,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
         String storageType = request.getStorageType();
         Preconditions.checkNotNull(storageType, "storageType is empty");
 
-        PageInfo<? extends BaseStorageListVO> page;
+        PageInfo<? extends BaseStorageListResponse> page;
         if (BizConstant.STORAGE_HIVE.equals(storageType.toUpperCase(Locale.ROOT))) {
             page = hiveOperation.getHiveStorageList(request);
         } else {
@@ -178,25 +179,25 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
 
     @Transactional(rollbackFor = Throwable.class)
     @Override
-    public boolean update(BaseStorageInfo storageInfo, String operator) {
+    public boolean update(BaseStorageRequest storageRequest, String operator) {
         if (LOGGER.isDebugEnabled()) {
-            LOGGER.debug("begin to update storage info={}", storageInfo);
+            LOGGER.debug("begin to update storage info={}", storageRequest);
         }
 
-        Preconditions.checkNotNull(storageInfo, "storage info is empty");
-        String groupId = storageInfo.getInlongGroupId();
+        Preconditions.checkNotNull(storageRequest, "storage info is empty");
+        String groupId = storageRequest.getInlongGroupId();
         Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
-        String streamId = storageInfo.getInlongStreamId();
+        String streamId = storageRequest.getInlongStreamId();
         Preconditions.checkNotNull(streamId, BizConstant.STREAM_ID_IS_EMPTY);
 
         // Check if it can be modified
         BusinessEntity businessEntity = super.checkBizIsTempStatus(groupId);
 
-        String storageType = storageInfo.getStorageType();
+        String storageType = storageRequest.getStorageType();
         Preconditions.checkNotNull(storageType, "storageType is empty");
 
         if (BizConstant.STORAGE_HIVE.equals(storageType.toUpperCase(Locale.ROOT))) {
-            hiveOperation.updateHiveStorage(businessEntity.getStatus(), storageInfo, operator);
+            hiveOperation.updateHiveStorage(businessEntity.getStatus(), storageRequest, operator);
         } else {
             LOGGER.error("the storageType={} not support", storageType);
             throw new BusinessException(BizErrorCodeEnum.STORAGE_TYPE_NOT_SUPPORTED);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
index 78bb270..1975c20 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.thirdpart.hive;
 
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
-import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.common.event.ListenerResult;
 import org.apache.inlong.manager.common.event.task.TaskEvent;
 import org.apache.inlong.manager.common.event.task.TaskEventListener;
 import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -53,16 +53,12 @@ public class CreateHiveTableForStreamListener implements TaskEventListener {
         String streamId = form.getInlongStreamId();
         log.info("begin create hive table for groupId={}, streamId={}", groupId, streamId);
 
-        List<StorageHiveSortInfo> hiveConfig = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
-        if (hiveConfig == null || hiveConfig.size() == 0) {
-            return ListenerResult.success();
-        }
-        for (StorageHiveSortInfo info : hiveConfig) {
-            hiveTableOperator.createHiveTable(groupId, info);
-        }
+        List<StorageHiveDTO> configList = hiveEntityMapper.selectAllHiveConfig(groupId, streamId);
+        hiveTableOperator.createHiveResource(groupId, configList);
 
-        log.info("finish create hive table for business {} - {} ", groupId, streamId);
-        return ListenerResult.success();
+        String result = "success to create hive table for group [" + groupId + "], stream [" + streamId + "]";
+        log.info(result);
+        return ListenerResult.success(result);
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
index 032bef3..45ac9e0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.thirdpart.hive;
 
 import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
-import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.apache.inlong.manager.common.event.ListenerResult;
 import org.apache.inlong.manager.common.event.task.TaskEvent;
 import org.apache.inlong.manager.common.event.task.TaskEventListener;
 import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -52,16 +52,12 @@ public class CreateHiveTableListener implements TaskEventListener {
         String groupId = form.getInlongGroupId();
         log.info("begin to create hive table for groupId={}", groupId);
 
-        List<StorageHiveSortInfo> configList = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, null);
-        if (configList == null || configList.size() == 0) {
-            return ListenerResult.success();
-        }
+        List<StorageHiveDTO> configList = hiveEntityMapper.selectAllHiveConfig(groupId, null);
+        hiveTableOperator.createHiveResource(groupId, configList);
 
-        for (StorageHiveSortInfo hiveConfig : configList) {
-            hiveTableOperator.createHiveTable(groupId, hiveConfig);
-            log.info("finish to create hive table for business {}", groupId);
-        }
-        return ListenerResult.success();
+        String result = "success to create hive table for group [" + groupId + "]";
+        log.info(result);
+        return ListenerResult.success(result);
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
index 60c0acc..d7a6f16 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
@@ -21,9 +21,11 @@ import static java.util.stream.Collectors.toList;
 
 import java.util.ArrayList;
 import java.util.List;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
 import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
 import org.apache.inlong.manager.common.pojo.query.DatabaseQueryBean;
 import org.apache.inlong.manager.common.pojo.query.hive.HiveColumnQueryBean;
@@ -32,34 +34,53 @@ import org.apache.inlong.manager.dao.entity.StorageHiveFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
 import org.apache.inlong.manager.service.core.DataSourceService;
 import org.apache.inlong.manager.service.core.StorageService;
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.service.core.impl.StorageHiveOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 /**
  * Create hive table operation
  */
-@Slf4j
 @Component
 public class HiveTableOperator {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(StorageHiveOperation.class);
+
     @Autowired
     private StorageService storageService;
     @Autowired
-    private DataSourceService<DatabaseQueryBean, HiveTableQueryBean> dataSourceService;
-
-    @Autowired
     private StorageHiveFieldEntityMapper hiveFieldMapper;
+    @Autowired
+    private DataSourceService<DatabaseQueryBean, HiveTableQueryBean> dataSourceService;
 
     /**
      * Create hive table according to the groupId and hive config
      */
-    public void createHiveTable(String groupId, StorageHiveSortInfo hiveConfig) {
-        if (log.isDebugEnabled()) {
-            log.debug("begin create hive table for business={}, hiveConfig={}", groupId, hiveConfig);
+    public void createHiveResource(String groupId, List<StorageHiveDTO> configList) {
+        if (CollectionUtils.isEmpty(configList)) {
+            LOGGER.warn("no hive config, skip to create");
+            return;
+        }
+        for (StorageHiveDTO config : configList) {
+            if (EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode().equals(config.getStatus())) {
+                LOGGER.warn("hive [" + config.getId() + "] already success, skip to create");
+                continue;
+            } else if (BizConstant.DISABLE_CREATE_TABLE.equals(config.getEnableCreateTable())) {
+                LOGGER.warn("create table was disable, skip to create table for hive [" + config.getId() + "]");
+                continue;
+            }
+            this.createTable(groupId, config);
         }
+    }
 
-        HiveTableQueryBean tableBean = getTableQueryBean(hiveConfig);
+    private void createTable(String groupId, StorageHiveDTO config) {
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("begin create hive table for business={}, config={}", groupId, config);
+        }
+
+        HiveTableQueryBean tableBean = getTableQueryBean(config);
         try {
             // create database if not exists
             dataSourceService.createDb(tableBean);
@@ -78,22 +99,22 @@ public class HiveTableOperator {
                     dataSourceService.createColumn(tableBean);
                 }
             }
-            storageService.updateHiveStatusById(hiveConfig.getId(),
+            storageService.updateHiveStatusById(config.getId(),
                     EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode(), "create hive table success");
         } catch (Throwable e) {
-            log.error("create hive table error, ", e);
-            storageService.updateHiveStatusById(hiveConfig.getId(),
+            LOGGER.error("create hive table error, ", e);
+            storageService.updateHiveStatusById(config.getId(),
                     EntityStatus.DATA_STORAGE_CONFIG_FAILED.getCode(), e.getMessage());
             throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
         }
 
-        log.info("finish create hive table for business {} ", groupId);
+        LOGGER.info("success create hive table for data group [" + groupId + "]");
     }
 
-    protected HiveTableQueryBean getTableQueryBean(StorageHiveSortInfo hiveConfig) {
-        String groupId = hiveConfig.getInlongGroupId();
-        String streamId = hiveConfig.getInlongStreamId();
-        log.info("begin to get table query bean for groupId={}, streamId={}", groupId, streamId);
+    protected HiveTableQueryBean getTableQueryBean(StorageHiveDTO config) {
+        String groupId = config.getInlongGroupId();
+        String streamId = config.getInlongStreamId();
+        LOGGER.info("begin to get table query bean for groupId={}, streamId={}", groupId, streamId);
 
         List<StorageHiveFieldEntity> fieldEntities = hiveFieldMapper.selectHiveFields(groupId, streamId);
 
@@ -107,31 +128,33 @@ public class HiveTableOperator {
         }
 
         // set partition field and type
-        String partitionField = hiveConfig.getPrimaryPartition();
+        String partitionField = config.getPrimaryPartition();
         if (partitionField != null) {
             HiveColumnQueryBean partColumn = new HiveColumnQueryBean();
             partColumn.setPartition(true);
             partColumn.setColumnName(partitionField);
-            partColumn.setColumnType("string"); // currently, only supports 'string' type
+            // currently, only supports 'string' type
+            partColumn.setColumnType("string");
             columnQueryBeans.add(partColumn);
         }
 
         HiveTableQueryBean queryBean = new HiveTableQueryBean();
         queryBean.setColumns(columnQueryBeans);
         // set terminated symbol
-        if (hiveConfig.getTargetSeparator() != null) {
-            char ch = (char) Integer.parseInt(hiveConfig.getTargetSeparator());
+        if (config.getTargetSeparator() != null) {
+            char ch = (char) Integer.parseInt(config.getTargetSeparator());
             queryBean.setFieldTerSymbol(String.valueOf(ch));
         }
-        queryBean.setUsername(hiveConfig.getUsername());
-        queryBean.setPassword(hiveConfig.getPassword());
-        queryBean.setTableName(hiveConfig.getTableName());
-        queryBean.setDbName(hiveConfig.getDbName());
-        queryBean.setJdbcUrl(hiveConfig.getJdbcUrl());
-
-        if (log.isDebugEnabled()) {
-            log.debug("success to get table query bean={}", queryBean);
+        queryBean.setUsername(config.getUsername());
+        queryBean.setPassword(config.getPassword());
+        queryBean.setTableName(config.getTableName());
+        queryBean.setDbName(config.getDbName());
+        queryBean.setJdbcUrl(config.getJdbcUrl());
+
+        if (LOGGER.isDebugEnabled()) {
+            LOGGER.debug("success to get table query bean={}", queryBean);
         }
         return queryBean;
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index 1263bc0..5b6f1a9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -30,7 +30,7 @@ import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
 import org.apache.inlong.manager.common.settings.BusinessSettings;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -112,8 +112,8 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
 
         // if streamId not null, just push the config belongs to the groupId and the streamId
         String streamId = form.getInlongStreamId();
-        List<StorageHiveSortInfo> hiveInfoList = storageHiveMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
-        for (StorageHiveSortInfo hiveInfo : hiveInfoList) {
+        List<StorageHiveDTO> hiveInfoList = storageHiveMapper.selectAllHiveConfig(groupId, streamId);
+        for (StorageHiveDTO hiveInfo : hiveInfoList) {
             Integer storageId = hiveInfo.getId();
 
             if (log.isDebugEnabled()) {
@@ -141,7 +141,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
         return ListenerResult.success();
     }
 
-    private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveSortInfo hiveInfo) {
+    private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveDTO hiveInfo) {
         String groupId = hiveInfo.getInlongGroupId();
         String streamId = hiveInfo.getInlongStreamId();
         List<StorageHiveFieldEntity> fieldList = hiveFieldMapper.selectHiveFields(groupId, streamId);
@@ -157,7 +157,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
         return new DataFlowInfo(hiveInfo.getId(), sourceInfo, sinkInfo);
     }
 
-    private HiveSinkInfo getSinkInfo(StorageHiveSortInfo hiveInfo, List<StorageHiveFieldEntity> fieldList) {
+    private HiveSinkInfo getSinkInfo(StorageHiveDTO hiveInfo, List<StorageHiveFieldEntity> fieldList) {
         if (hiveInfo.getJdbcUrl() == null) {
             throw new WorkflowListenerException("hive server url cannot be empty");
         }
@@ -221,7 +221,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
     /**
      * Get source info
      */
-    private SourceInfo getSourceInfo(BusinessInfo businessInfo, StorageHiveSortInfo storageInfo,
+    private SourceInfo getSourceInfo(BusinessInfo businessInfo, StorageHiveDTO storageInfo,
             List<StorageHiveFieldEntity> fieldList) {
         DeserializationInfo deserializationInfo = null;
         boolean isDbType = BizConstant.DATA_SOURCE_DB.equals(storageInfo.getDataSourceType());
@@ -306,7 +306,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
     }
 
     private PulsarSourceInfo createPulsarSourceInfo(BusinessInfo businessInfo,
-            StorageHiveSortInfo storageInfo,
+            StorageHiveDTO storageInfo,
             DeserializationInfo deserializationInfo,
             List<FieldInfo> sourceFields) {
         final String tenant = clusterBean.getDefaultTenant();
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImplTest.java
deleted file mode 100644
index 371af1c..0000000
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImplTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
-import org.apache.inlong.manager.dao.entity.BusinessExtEntity;
-import org.apache.inlong.manager.dao.mapper.BusinessExtEntityMapper;
-import org.apache.inlong.manager.service.core.BaseTest;
-import org.junit.Assert;
-import org.junit.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class BusinessServiceImplTest extends BaseTest {
-
-    @Autowired
-    BusinessServiceImpl businessService;
-
-    @Autowired
-    BusinessExtEntityMapper businessExtMapper;
-
-    @Test
-    public void testSaveAndUpdateExt() {
-        final String groupId = "group1";
-        //check insert
-        BusinessExtInfo businessExtInfo1 = new BusinessExtInfo();
-        businessExtInfo1.setId(1);
-        businessExtInfo1.setInlongGroupId("group1");
-        businessExtInfo1.setKeyName("pulsar_url");
-        businessExtInfo1.setKeyValue("http://127.0.0.1:8080");
-        BusinessExtInfo businessExtInfo2 = new BusinessExtInfo();
-        businessExtInfo2.setId(2);
-        businessExtInfo2.setInlongGroupId("group1");
-        businessExtInfo2.setKeyName("pulsar_secret");
-        businessExtInfo2.setKeyValue("QWEASDZXC");
-        ArrayList<BusinessExtInfo> businessExtInfoList = Lists.newArrayList(businessExtInfo1, businessExtInfo2);
-        businessService.saveExt(groupId, businessExtInfoList);
-        List<BusinessExtEntity> extEntityList = businessExtMapper.selectByGroupId(groupId);
-        Assert.assertTrue(extEntityList.size() == 2);
-        Assert.assertTrue(extEntityList.get(0).getKeyName().equals("pulsar_url"));
-        Assert.assertTrue(extEntityList.get(0).getKeyValue().equals("http://127.0.0.1:8080"));
-        //check update
-        businessExtInfo1.setKeyValue("http://127.0.0.1:8081");
-        businessService.updateExt(groupId,businessExtInfoList);
-        extEntityList = businessExtMapper.selectByGroupId(groupId);
-        Assert.assertTrue(extEntityList.size() == 2);
-        Assert.assertTrue(extEntityList.get(0).getKeyValue().equals("http://127.0.0.1:8081"));
-    }
-
-}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e8e7152..1713247 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -242,7 +242,7 @@ DROP TABLE IF EXISTS `consumption`;
 CREATE TABLE `consumption`
 (
     `id`                  int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `consumer_group_name` varchar(255) DEFAULT NULL COMMENT 'consumer group name',
+    `consumer_group_name` varchar(255)      DEFAULT NULL COMMENT 'consumer group name',
     `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
     `in_charges`          varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
     `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group id',
@@ -355,7 +355,7 @@ CREATE TABLE `data_stream`
     `id`                     int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_stream_id`       varchar(128) NOT NULL COMMENT 'Data stream id, non-deleted globally unique',
     `inlong_group_id`        varchar(128) NOT NULL COMMENT 'Owning business group id',
-    `name`                   varchar(64)  DEFAULT NULL COMMENT 'The name of the data stream page display, can be Chinese',
+    `name`                   varchar(64)       DEFAULT NULL COMMENT 'The name of the data stream page display, can be Chinese',
     `description`            varchar(256)      DEFAULT '' COMMENT 'Introduction to data stream',
     `mq_resource_obj`        varchar(128)      DEFAULT NULL COMMENT 'MQ resource object, in the data stream, Tube is data_stream_id, Pulsar is Topic',
     `data_source_type`       varchar(32)       DEFAULT 'FILE' COMMENT 'Data source type, including: FILE, DB, Auto-Push (DATA_PROXY_SDK, HTTP)',
@@ -604,6 +604,7 @@ CREATE TABLE `storage_hive`
     `id`                          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`             varchar(128) NOT NULL COMMENT 'Owning business group id',
     `inlong_stream_id`            varchar(128) NOT NULL COMMENT 'Owning data stream id',
+    `enable_create_table`         tinyint(1)        DEFAULT 1 COMMENT 'Whether to enable create table, 1: enable, 0: disable, default is 1',
     `jdbc_url`                    varchar(255)      DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
     `username`                    varchar(128)      DEFAULT NULL COMMENT 'Username',
     `password`                    varchar(255)      DEFAULT NULL COMMENT 'User password',
@@ -1106,27 +1107,27 @@ CREATE TABLE `flume_sink_ext`
 DROP TABLE IF EXISTS `db_collector_detail_task`;
 CREATE TABLE `db_collector_detail_task`
 (
-    `id`                    int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `main_id`               varchar(128) NOT NULL COMMENT 'main task id',
-    `type`                  int(11)      NOT NULL COMMENT 'task type',
-    `time_var`              varchar(64)  NOT NULL COMMENT 'time variable',
-    `db_type`               int(11)      NOT NULL COMMENT 'db type',
-    `ip`                    varchar(64)  NOT NULL COMMENT 'db ip',
-    `port`                  int(11)      NOT NULL COMMENT 'db port',
-    `db_name`               varchar(64)  NULL COMMENT 'db name',
-    `user`                  varchar(64)  NULL COMMENT 'user name',
-    `password`              varchar(64)  NULL COMMENT 'password',
-    `sql_statement`         varchar(256) NULL COMMENT 'sql statement',
-    `offset`                int(11)      NOT NULL COMMENT 'offset for the data source',
-    `total_limit`           int(11)      NOT NULL COMMENT 'total limit in a task',
-    `once_limit`            int(11)      NOT NULL COMMENT 'limit for one query',
-    `time_limit`            int(11)      NOT NULL COMMENT 'time limit for task',
-    `retry_times`           int(11)      NOT NULL COMMENT 'max retry times if task failes',
-    `group_id`              varchar(64)  NULL COMMENT 'group id',
-    `stream_id`             varchar(64)  NULL COMMENT 'stream id',
-    `state`                 int(11)      NOT NULL COMMENT 'task state',
-    `create_time`           timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
-    `modify_time`           timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+    `id`            int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `main_id`       varchar(128) NOT NULL COMMENT 'main task id',
+    `type`          int(11)      NOT NULL COMMENT 'task type',
+    `time_var`      varchar(64)  NOT NULL COMMENT 'time variable',
+    `db_type`       int(11)      NOT NULL COMMENT 'db type',
+    `ip`            varchar(64)  NOT NULL COMMENT 'db ip',
+    `port`          int(11)      NOT NULL COMMENT 'db port',
+    `db_name`       varchar(64)  NULL COMMENT 'db name',
+    `user`          varchar(64)  NULL COMMENT 'user name',
+    `password`      varchar(64)  NULL COMMENT 'password',
+    `sql_statement` varchar(256) NULL COMMENT 'sql statement',
+    `offset`        int(11)      NOT NULL COMMENT 'offset for the data source',
+    `total_limit`   int(11)      NOT NULL COMMENT 'total limit in a task',
+    `once_limit`    int(11)      NOT NULL COMMENT 'limit for one query',
+    `time_limit`    int(11)      NOT NULL COMMENT 'time limit for task',
+    `retry_times`   int(11)      NOT NULL COMMENT 'max retry times if task failes',
+    `group_id`      varchar(64)  NULL COMMENT 'group id',
+    `stream_id`     varchar(64)  NULL COMMENT 'stream id',
+    `state`         int(11)      NOT NULL COMMENT 'task state',
+    `create_time`   timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+    `modify_time`   timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
     PRIMARY KEY (`id`)
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='db collector detail task table';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java
index b63fc7e..6ae93ae 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java
@@ -29,8 +29,9 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
 import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
-import org.apache.inlong.manager.common.pojo.datastream.FullPageInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamRequest;
 import org.apache.inlong.manager.common.pojo.datastream.FullPageUpdateInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamResponse;
 import org.apache.inlong.manager.common.util.LoginUserUtil;
 import org.apache.inlong.manager.service.core.DataStreamService;
 import org.apache.inlong.manager.service.core.operationlog.OperationLog;
@@ -64,14 +65,14 @@ public class DataStreamController {
     @RequestMapping(value = "/saveAll", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save data stream page information ,including source and storage")
-    public Response<Boolean> saveAll(@RequestBody FullPageInfo pageInfo) {
+    public Response<Boolean> saveAll(@RequestBody FullStreamRequest pageInfo) {
         return Response.success(dataStreamService.saveAll(pageInfo, LoginUserUtil.getLoginUserDetail().getUserName()));
     }
 
     @RequestMapping(value = "/batchSaveAll", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Batch save data stream page information ,including source and storage")
-    public Response<Boolean> batchSaveAll(@RequestBody List<FullPageInfo> infoList) {
+    public Response<Boolean> batchSaveAll(@RequestBody List<FullStreamRequest> infoList) {
         boolean result = dataStreamService.batchSaveAll(infoList, LoginUserUtil.getLoginUserDetail().getUserName());
         return Response.success(result);
     }
@@ -95,7 +96,7 @@ public class DataStreamController {
 
     @RequestMapping(value = "/listAll", method = RequestMethod.GET)
     @ApiOperation(value = "Paging query all data of the data stream page under the specified groupId")
-    public Response<PageInfo<FullPageInfo>> listAllWithGroupId(DataStreamPageRequest request) {
+    public Response<PageInfo<FullStreamResponse>> listAllWithGroupId(DataStreamPageRequest request) {
         request.setCurrentUser(LoginUserUtil.getLoginUserDetail().getUserName());
         return Response.success(dataStreamService.listAllWithGroupId(request));
     }
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java
index 5e64559..cba9cf8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java
@@ -25,8 +25,9 @@ import io.swagger.annotations.ApiOperation;
 import java.util.List;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
 import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
 import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
 import org.apache.inlong.manager.common.pojo.query.ConnectionInfo;
@@ -60,7 +61,7 @@ public class StorageController {
     @RequestMapping(value = "/save", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.CREATE)
     @ApiOperation(value = "Save storage information")
-    public Response<Integer> save(@RequestBody BaseStorageInfo storageInfo) {
+    public Response<Integer> save(@RequestBody BaseStorageRequest storageInfo) {
         return Response.success(storageService.save(storageInfo, LoginUserUtil.getLoginUserDetail().getUserName()));
     }
 
@@ -70,20 +71,20 @@ public class StorageController {
             @ApiImplicitParam(name = "storageType", dataTypeClass = String.class, required = true),
             @ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
     })
-    public Response<BaseStorageInfo> get(@RequestParam String storageType, @PathVariable Integer id) {
+    public Response<BaseStorageResponse> get(@RequestParam String storageType, @PathVariable Integer id) {
         return Response.success(storageService.getById(storageType, id));
     }
 
     @RequestMapping(value = "/list", method = RequestMethod.GET)
     @ApiOperation(value = "Query data storage list based on conditions")
-    public Response<PageInfo<? extends BaseStorageListVO>> listByCondition(StoragePageRequest request) {
+    public Response<PageInfo<? extends BaseStorageListResponse>> listByCondition(StoragePageRequest request) {
         return Response.success(storageService.listByCondition(request));
     }
 
     @RequestMapping(value = "/update", method = RequestMethod.POST)
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Modify data storage information")
-    public Response<Boolean> update(@RequestBody BaseStorageInfo storageInfo) {
+    public Response<Boolean> update(@RequestBody BaseStorageRequest storageInfo) {
         return Response.success(storageService.update(storageInfo, LoginUserUtil.getLoginUserDetail().getUserName()));
     }
 
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 5e56f81..35f8e8d 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -16,9 +16,11 @@
 # specific language governing permissions and limitations
 # under the License.
 #
+
 # Log level
 logging.level.root=INFO
 logging.level.org.apache.inlong.manager=debug
+
 spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
 spring.datasource.druid.username=root
 spring.datasource.druid.password=inlong
@@ -42,26 +44,32 @@ spring.datasource.druid.testOnReturn=false
 spring.datasource.druid.filters=stat,wall
 # Open the mergeSql function through the connectProperties property, Slow SQL records
 spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
 # Manager address of Tube cluster, used to create Topic
 cluster.tube.manager=http://127.0.0.1:8081
 # Master address, used to manage Tube broker
 cluster.tube.master=127.0.0.1:8000,127.0.0.1:8010
 # Tube cluster ID
 cluster.tube.clusterId=1
+
 # Push configuration to the path on ZooKeeper
 cluster.zk.url=127.0.0.1:2181
 cluster.zk.root=inlong_hive
+
 # Application name in Sort
 sort.appName=inlong_app
+
 # Pulsar admin URL
 pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
 # Pulsar broker address
 pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
 # Default tenant of Pulsar
 pulsar.defaultTenant=public
+
 # Audit configuration
 # Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH]
 audit.query.source=MYSQL
+
 # Elasticsearch config
 # Elasticsearch host split by coma if more than one host, such as 'host1,host2'
 es.index.search.hostname=127.0.0.1
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
index 8936052..5bf9e44 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
@@ -17,9 +17,15 @@
 
 package org.apache.inlong.manager.service.core;
 
+import java.util.Arrays;
+import java.util.List;
 import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessPulsarInfo;
+import org.apache.inlong.manager.dao.entity.BusinessExtEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessExtEntityMapper;
 import org.apache.inlong.manager.web.ServiceBaseTest;
 import org.junit.Assert;
 import org.junit.Test;
@@ -27,19 +33,36 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.TestComponent;
 
 /**
- * Business Service Test
+ * Business service test
  */
 @TestComponent
 public class BusinessServiceTest extends ServiceBaseTest {
 
+    private final String globalGroupId = "b_group1";
+    private final String globalGroupName = "group1";
+    private final String globalOperator = "test_user";
+
+    @Autowired
+    BusinessExtEntityMapper businessExtMapper;
     @Autowired
     private BusinessService businessService;
 
     public String saveBusiness(String groupName, String operator) {
-        BusinessInfo businessInfo = new BusinessInfo();
+        BusinessInfo businessInfo;
+        try {
+            businessInfo = businessService.get(globalGroupId);
+            if (businessInfo != null) {
+                return businessInfo.getInlongGroupId();
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+
+        businessInfo = new BusinessInfo();
         businessInfo.setName(groupName);
         businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
         businessInfo.setCreator(operator);
+        businessInfo.setStatus(EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode());
 
         BusinessPulsarInfo pulsarInfo = new BusinessPulsarInfo();
         pulsarInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
@@ -53,20 +76,43 @@ public class BusinessServiceTest extends ServiceBaseTest {
     }
 
     @Test
-    public void testSave() {
-        String groupName = "test_group1";
-        String operator = "test_user";
-        String groupId = this.saveBusiness(groupName, operator);
+    public void testSaveAndDelete() {
+        String groupId = this.saveBusiness(globalGroupName, globalOperator);
         Assert.assertNotNull(groupId);
+
+        boolean result = businessService.delete(groupId, globalOperator);
+        Assert.assertTrue(result);
     }
 
     @Test
-    public void testDelete() {
-        String groupName = "test_group2";
-        String operator = "test_user";
-        String groupId = this.saveBusiness(groupName, operator);
-        boolean result = businessService.delete(groupId, operator);
-        Assert.assertTrue(result);
+    public void testSaveAndUpdateExt() {
+        // check insert
+        BusinessExtInfo businessExtInfo1 = new BusinessExtInfo();
+        businessExtInfo1.setId(1);
+        businessExtInfo1.setInlongGroupId(globalGroupId);
+        businessExtInfo1.setKeyName("pulsar_url");
+        businessExtInfo1.setKeyValue("http://127.0.0.1:8080");
+
+        BusinessExtInfo businessExtInfo2 = new BusinessExtInfo();
+        businessExtInfo2.setId(2);
+        businessExtInfo2.setInlongGroupId(globalGroupId);
+        businessExtInfo2.setKeyName("pulsar_secret");
+        businessExtInfo2.setKeyValue("QWEASDZXC");
+
+        List<BusinessExtInfo> businessExtInfoList = Arrays.asList(businessExtInfo1, businessExtInfo2);
+        businessService.saveOrUpdateExt(globalGroupId, businessExtInfoList);
+
+        List<BusinessExtEntity> extEntityList = businessExtMapper.selectByGroupId(globalGroupId);
+        Assert.assertEquals(2, extEntityList.size());
+        Assert.assertEquals("pulsar_url", extEntityList.get(0).getKeyName());
+        Assert.assertEquals("http://127.0.0.1:8080", extEntityList.get(0).getKeyValue());
+
+        // check update
+        businessExtInfo1.setKeyValue("http://127.0.0.1:8081");
+        businessService.saveOrUpdateExt(globalGroupId, businessExtInfoList);
+        extEntityList = businessExtMapper.selectByGroupId(globalGroupId);
+        Assert.assertEquals(2, extEntityList.size());
+        Assert.assertEquals("http://127.0.0.1:8081", extEntityList.get(0).getKeyValue());
     }
 
 }
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
index 8b74aef..718d138 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
@@ -26,7 +26,7 @@ import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
 /**
- * Consumption Service Test
+ * Consumption service test
  */
 public class ConsumptionServiceTest extends ServiceBaseTest {
 
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStorageServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStorageServiceTest.java
new file mode 100644
index 0000000..406c1f3
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStorageServiceTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.web.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Data storage service test
+ */
+public class DataStorageServiceTest extends ServiceBaseTest {
+
+    private final String globalGroupId = "b_group1";
+    private final String globalStreamId = "stream1";
+    private final String globalOperator = "test_user";
+
+    @Autowired
+    private StorageService storageService;
+    @Autowired
+    private BusinessServiceTest businessServiceTest;
+    @Autowired
+    private DataStreamServiceTest streamServiceTest;
+
+    public Integer saveStorage() {
+        streamServiceTest.saveDataStream(globalGroupId, globalStreamId, globalOperator);
+
+        StorageHiveRequest storageInfo = new StorageHiveRequest();
+        storageInfo.setInlongGroupId(globalGroupId);
+        storageInfo.setInlongStreamId(globalStreamId);
+        storageInfo.setStorageType(BizConstant.STORAGE_HIVE);
+        storageInfo.setEnableCreateTable(BizConstant.DISABLE_CREATE_TABLE);
+
+        return storageService.save(storageInfo, globalOperator);
+    }
+
+    @Test
+    public void testSaveAndDelete() {
+        Integer id = this.saveStorage();
+        Assert.assertNotNull(id);
+
+        boolean result = storageService.delete(BizConstant.STORAGE_HIVE, id, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+    @Test
+    public void testListByIdentifier() {
+        Integer id = this.saveStorage();
+
+        BaseStorageResponse storage = storageService.getById(BizConstant.STORAGE_HIVE, id);
+        Assert.assertEquals(globalGroupId, storage.getInlongGroupId());
+
+        storageService.delete(BizConstant.STORAGE_HIVE, id, globalOperator);
+    }
+
+    @Test
+    public void testGetAndUpdate() {
+        Integer id = this.saveStorage();
+        BaseStorageResponse storage = storageService.getById(BizConstant.STORAGE_HIVE, id);
+        Assert.assertEquals(globalGroupId, storage.getInlongGroupId());
+
+        StorageHiveResponse hiveResponse = (StorageHiveResponse) storage;
+        hiveResponse.setEnableCreateTable(BizConstant.DISABLE_CREATE_TABLE);
+
+        StorageHiveRequest request = CommonBeanUtils.copyProperties(hiveResponse, StorageHiveRequest::new);
+        boolean result = storageService.update(request, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStreamServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStreamServiceTest.java
new file mode 100644
index 0000000..a22c974
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStreamServiceTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
+import org.apache.inlong.manager.web.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.TestComponent;
+
+/**
+ * Data stream service test
+ */
+@TestComponent
+public class DataStreamServiceTest extends ServiceBaseTest {
+
+    private final String globalGroupId = "b_group1";
+    private final String globalGroupName = "group1";
+    private final String globalStreamId = "stream1";
+    private final String globalOperator = "test_user";
+
+    @Autowired
+    private DataStreamService dataStreamService;
+    @Autowired
+    private BusinessServiceTest businessServiceTest;
+
+    public Integer saveDataStream(String groupId, String streamId, String operator) {
+        DataStreamInfo streamInfo;
+        try {
+            streamInfo = dataStreamService.get(groupId, streamId);
+            if (streamInfo != null) {
+                return streamInfo.getId();
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+
+        businessServiceTest.saveBusiness(globalGroupName, operator);
+
+        streamInfo = new DataStreamInfo();
+        streamInfo.setInlongGroupId(groupId);
+        streamInfo.setInlongStreamId(streamId);
+        streamInfo.setDataEncoding("UTF-8");
+
+        return dataStreamService.save(streamInfo, operator);
+    }
+
+    @Test
+    public void testSaveAndDelete() {
+        Integer id = this.saveDataStream(globalGroupId, globalStreamId, globalOperator);
+        Assert.assertNotNull(id);
+
+        boolean result = dataStreamService.delete(globalGroupId, globalStreamId, globalOperator);
+        Assert.assertTrue(result);
+    }
+
+}
diff --git a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
index 7bdcb27..1184ddd 100644
--- a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
@@ -570,6 +570,7 @@ CREATE TABLE `storage_hive`
     `id`                          int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
     `inlong_group_id`             varchar(128) NOT NULL COMMENT 'Owning business group id',
     `inlong_stream_id`            varchar(128) NOT NULL COMMENT 'Owning data stream id',
+    `enable_create_table`         tinyint(1)            DEFAULT 1 COMMENT 'Whether to enable create table, 1: enable, 0: disable, default is 1',
     `jdbc_url`                    varchar(255)          DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
     `username`                    varchar(128)          DEFAULT NULL COMMENT 'Username',
     `password`                    varchar(255)          DEFAULT NULL COMMENT 'User password',