You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/23 06:12:02 UTC
[incubator-inlong] branch master updated: [INLONG-2274][Manager] Supports configuring whether to create a Hive database or table (#2277)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dbb2f94 [INLONG-2274][Manager] Supports configuring whether to create a Hive database or table (#2277)
dbb2f94 is described below
commit dbb2f9475534eab0b4c4e300948906e2ba7dee9d
Author: healchow <he...@gmail.com>
AuthorDate: Sun Jan 23 14:11:58 2022 +0800
[INLONG-2274][Manager] Supports configuring whether to create a Hive database or table (#2277)
---
.../inlong/manager/common/enums/BizConstant.java | 3 +
...ageListVO.java => BaseStorageListResponse.java} | 4 +-
...aseStorageInfo.java => BaseStorageRequest.java} | 32 +-------
...seStorageInfo.java => BaseStorageResponse.java} | 32 +-------
...torageHiveSortInfo.java => StorageHiveDTO.java} | 5 +-
...iveListVO.java => StorageHiveListResponse.java} | 4 +-
...torageHiveInfo.java => StorageHiveRequest.java} | 19 ++---
...orageHiveInfo.java => StorageHiveResponse.java} | 32 ++++++--
.../{FullPageInfo.java => FullStreamRequest.java} | 10 +--
.../{FullPageInfo.java => FullStreamResponse.java} | 10 +--
.../manager/dao/entity/StorageHiveEntity.java | 4 +-
.../dao/mapper/BusinessExtEntityMapper.java | 2 -
.../dao/mapper/StorageHiveEntityMapper.java | 4 +-
.../resources/mappers/BusinessExtEntityMapper.xml | 27 -------
.../resources/mappers/StorageHiveEntityMapper.xml | 57 +++++++-------
.../manager/service/core/BusinessService.java | 11 +++
.../manager/service/core/DataStreamService.java | 13 ++--
.../manager/service/core/StorageService.java | 19 ++---
.../service/core/impl/BusinessServiceImpl.java | 27 ++-----
.../service/core/impl/DataStreamServiceImpl.java | 58 +++++++-------
.../service/core/impl/SourceFileServiceImpl.java | 2 +-
.../service/core/impl/StorageHiveOperation.java | 48 +++++++-----
.../service/core/impl/StorageServiceImpl.java | 41 +++++-----
.../hive/CreateHiveTableForStreamListener.java | 20 ++---
.../thirdpart/hive/CreateHiveTableListener.java | 20 ++---
.../service/thirdpart/hive/HiveTableOperator.java | 85 ++++++++++++--------
.../thirdpart/sort/PushHiveConfigTaskListener.java | 14 ++--
.../service/core/impl/BusinessServiceImplTest.java | 67 ----------------
.../manager-web/sql/apache_inlong_manager.sql | 47 +++++------
.../web/controller/DataStreamController.java | 9 ++-
.../manager/web/controller/StorageController.java | 13 ++--
.../src/main/resources/application-dev.properties | 8 ++
.../manager/service/core/BusinessServiceTest.java | 70 ++++++++++++++---
.../service/core/ConsumptionServiceTest.java | 2 +-
.../service/core/DataStorageServiceTest.java | 91 ++++++++++++++++++++++
.../service/core/DataStreamServiceTest.java | 73 +++++++++++++++++
.../test/resources/sql/apache_inlong_manager.sql | 1 +
37 files changed, 556 insertions(+), 428 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index 262fb3b..8de8a2a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -66,4 +66,7 @@ public class BizConstant {
public static final String PREFIX_RLQ = "rlq"; // prefix of the Topic of the retry letter queue
+ public static final Integer ENABLE_CREATE_TABLE = 1; // Enable create table
+
+ public static final Integer DISABLE_CREATE_TABLE = 0; // Disable create table
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListVO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListResponse.java
similarity index 95%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListVO.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListResponse.java
index 294391f..80d221d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListVO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageListResponse.java
@@ -23,10 +23,10 @@ import java.util.Date;
import lombok.Data;
/**
- * Paging list of data storage
+ * Response of data storage list
*/
@Data
-public class BaseStorageListVO {
+public class BaseStorageListResponse {
@ApiModelProperty(value = "Primary key")
private Integer id;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageRequest.java
similarity index 68%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageRequest.java
index 2832839..ed0b185 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageRequest.java
@@ -17,27 +17,25 @@
package org.apache.inlong.manager.common.pojo.datastorage;
-import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
import lombok.Data;
import org.apache.inlong.manager.common.enums.BizConstant;
/**
- * Basic data storage information
+ * Basic request of data storage
*/
@Data
-@ApiModel("Basic data storage information")
+@ApiModel("Basic request of data storage")
@JsonTypeInfo(use = Id.NAME, visible = true, property = "storageType")
@JsonSubTypes({
- @Type(value = StorageHiveInfo.class, name = BizConstant.STORAGE_HIVE)
+ @Type(value = StorageHiveRequest.class, name = BizConstant.STORAGE_HIVE)
})
-public class BaseStorageInfo {
+public class BaseStorageRequest {
private Integer id;
@@ -53,26 +51,4 @@ public class BaseStorageInfo {
@ApiModelProperty("Data storage period, unit: day")
private Integer storagePeriod;
- @ApiModelProperty("Status")
- private Integer status;
-
- @ApiModelProperty("Previous State")
- private Integer previousStatus;
-
- @ApiModelProperty("is deleted? 0: deleted, 1: not deleted")
- private Integer isDeleted = 0;
-
- private String creator;
-
- private String modifier;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date createTime;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date modifyTime;
-
- @ApiModelProperty("Temporary view, string in JSON format")
- private String tempView;
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageResponse.java
similarity index 68%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageResponse.java
index 2832839..649b369 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/BaseStorageResponse.java
@@ -17,27 +17,25 @@
package org.apache.inlong.manager.common.pojo.datastorage;
-import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
import lombok.Data;
import org.apache.inlong.manager.common.enums.BizConstant;
/**
- * Basic data storage information
+ * Basic response of data storage
*/
@Data
-@ApiModel("Basic data storage information")
+@ApiModel("Basic response of data storage")
@JsonTypeInfo(use = Id.NAME, visible = true, property = "storageType")
@JsonSubTypes({
- @Type(value = StorageHiveInfo.class, name = BizConstant.STORAGE_HIVE)
+ @Type(value = StorageHiveResponse.class, name = BizConstant.STORAGE_HIVE)
})
-public class BaseStorageInfo {
+public class BaseStorageResponse {
private Integer id;
@@ -53,26 +51,4 @@ public class BaseStorageInfo {
@ApiModelProperty("Data storage period, unit: day")
private Integer storagePeriod;
- @ApiModelProperty("Status")
- private Integer status;
-
- @ApiModelProperty("Previous State")
- private Integer previousStatus;
-
- @ApiModelProperty("is deleted? 0: deleted, 1: not deleted")
- private Integer isDeleted = 0;
-
- private String creator;
-
- private String modifier;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date createTime;
-
- @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
- private Date modifyTime;
-
- @ApiModelProperty("Temporary view, string in JSON format")
- private String tempView;
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveDTO.java
similarity index 95%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveDTO.java
index bf52872..9906ea0 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveDTO.java
@@ -20,14 +20,15 @@ package org.apache.inlong.manager.common.pojo.datastorage;
import lombok.Data;
/**
- * Hive info for Sort config
+ * Hive info
*/
@Data
-public class StorageHiveSortInfo {
+public class StorageHiveDTO {
private Integer id;
private String inlongGroupId;
private String inlongStreamId;
+ private Integer enableCreateTable;
// Hive server info
private String jdbcUrl;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListVO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListResponse.java
similarity index 94%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListVO.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListResponse.java
index 9e9d455..78873c6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListVO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveListResponse.java
@@ -23,12 +23,12 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
/**
- * Response of Hive storage paging list
+ * Response of Hive storage list
*/
@Data
@EqualsAndHashCode(callSuper = true)
@ApiModel("Response of Hive storage paging list")
-public class StorageHiveListVO extends BaseStorageListVO {
+public class StorageHiveListResponse extends BaseStorageListResponse {
@ApiModelProperty("target database name")
private String dbName;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveRequest.java
similarity index 86%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveRequest.java
index 0c7b3c1..f562f33 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveRequest.java
@@ -26,16 +26,19 @@ import lombok.ToString;
import org.apache.inlong.manager.common.enums.BizConstant;
/**
- * Interaction objects for Hive storage
+ * Request of the Hive storage info
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@ApiModel(value = "Interaction objects for Hive storage")
-public class StorageHiveInfo extends BaseStorageInfo {
+@ApiModel(value = "Request of the Hive storage info")
+public class StorageHiveRequest extends BaseStorageRequest {
private String storageType = BizConstant.STORAGE_HIVE;
+ @ApiModelProperty("Whether to enable create table, 1: enable, 0: disable, default is 1")
+ private Integer enableCreateTable = 1;
+
@ApiModelProperty("Hive JDBC URL")
private String jdbcUrl;
@@ -81,16 +84,10 @@ public class StorageHiveInfo extends BaseStorageInfo {
@ApiModelProperty("Data field separator")
private String dataSeparator;
- @ApiModelProperty("Data storage period in Hive, unit: Day")
- private Integer storagePeriod;
-
- @ApiModelProperty("Backend operation log")
- private String optLog;
-
- @ApiModelProperty("hive table field list")
+ @ApiModelProperty("Hive table field list")
private List<StorageHiveFieldInfo> hiveFieldList;
- @ApiModelProperty("other ext info list")
+ @ApiModelProperty("Other ext info list")
private List<StorageExtInfo> extList;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveResponse.java
similarity index 79%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveResponse.java
index 0c7b3c1..4a81f63 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveResponse.java
@@ -17,8 +17,10 @@
package org.apache.inlong.manager.common.pojo.datastorage;
+import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
+import java.util.Date;
import java.util.List;
import lombok.Data;
import lombok.EqualsAndHashCode;
@@ -26,16 +28,19 @@ import lombok.ToString;
import org.apache.inlong.manager.common.enums.BizConstant;
/**
- * Interaction objects for Hive storage
+ * Response of the Hive storage info
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
-@ApiModel(value = "Interaction objects for Hive storage")
-public class StorageHiveInfo extends BaseStorageInfo {
+@ApiModel(value = "Response of the Hive storage info")
+public class StorageHiveResponse extends BaseStorageResponse {
private String storageType = BizConstant.STORAGE_HIVE;
+ @ApiModelProperty("Whether to enable create table")
+ private Integer enableCreateTable;
+
@ApiModelProperty("Hive JDBC URL")
private String jdbcUrl;
@@ -81,12 +86,27 @@ public class StorageHiveInfo extends BaseStorageInfo {
@ApiModelProperty("Data field separator")
private String dataSeparator;
- @ApiModelProperty("Data storage period in Hive, unit: Day")
- private Integer storagePeriod;
-
@ApiModelProperty("Backend operation log")
private String optLog;
+ @ApiModelProperty("Status")
+ private Integer status;
+
+ @ApiModelProperty("Previous State")
+ private Integer previousStatus;
+
+ @ApiModelProperty("Creator")
+ private String creator;
+
+ @ApiModelProperty("modifier")
+ private String modifier;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
@ApiModelProperty("hive table field list")
private List<StorageHiveFieldInfo> hiveFieldList;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamRequest.java
similarity index 88%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamRequest.java
index cf4c4fc..2e31cfb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamRequest.java
@@ -25,14 +25,14 @@ import org.apache.inlong.manager.common.pojo.datasource.SourceDbBasicInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceDbDetailInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceFileBasicInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceFileDetailInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
/**
- * All information on the data stream page, including data stream, data source, and data storage
+ * All request info on the data stream page, including data stream, data source, and data storage
*/
@Data
-@ApiModel("All information of the data flow page")
-public class FullPageInfo {
+@ApiModel("All request info on the data stream page")
+public class FullStreamRequest {
@ApiModelProperty("Data stream information")
private DataStreamInfo streamInfo;
@@ -50,6 +50,6 @@ public class FullPageInfo {
private List<SourceDbDetailInfo> dbDetailInfoList;
@ApiModelProperty("Data storage information")
- private List<BaseStorageInfo> storageInfo;
+ private List<BaseStorageRequest> storageInfo;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamResponse.java
similarity index 88%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamResponse.java
index cf4c4fc..26e742d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullPageInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/FullStreamResponse.java
@@ -25,14 +25,14 @@ import org.apache.inlong.manager.common.pojo.datasource.SourceDbBasicInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceDbDetailInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceFileBasicInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceFileDetailInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
/**
- * All information on the data stream page, including data stream, data source, and data storage
+ * All response info on the data stream page, including data stream, data source, and data storage
*/
@Data
-@ApiModel("All information of the data flow page")
-public class FullPageInfo {
+@ApiModel("All response info on the data stream page")
+public class FullStreamResponse {
@ApiModelProperty("Data stream information")
private DataStreamInfo streamInfo;
@@ -50,6 +50,6 @@ public class FullPageInfo {
private List<SourceDbDetailInfo> dbDetailInfoList;
@ApiModelProperty("Data storage information")
- private List<BaseStorageInfo> storageInfo;
+ private List<BaseStorageResponse> storageInfo;
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
index c6e513e..d3b3542 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
@@ -28,6 +28,9 @@ public class StorageHiveEntity implements Serializable {
private Integer id;
private String inlongGroupId;
private String inlongStreamId;
+
+ private Integer enableCreateTable;
+
private String jdbcUrl;
private String username;
private String password;
@@ -55,6 +58,5 @@ public class StorageHiveEntity implements Serializable {
private String modifier;
private Date createTime;
private Date modifyTime;
- private String tempView;
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java
index f5385f7..fa7b5bf 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/BusinessExtEntityMapper.java
@@ -35,8 +35,6 @@ public interface BusinessExtEntityMapper {
List<BusinessExtEntity> selectByGroupId(String groupId);
- int updateByPrimaryKeySelective(BusinessExtEntity record);
-
int updateByPrimaryKey(BusinessExtEntity record);
BusinessExtEntity selectByGroupIdAndKeyName(String groupId, String keyName);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
index fea4cd5..c30e712 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.dao.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
@@ -90,7 +90,7 @@ public interface StorageHiveEntityMapper {
* @param streamId Data stream id, if is null, get all configs under the group id
* @return Hive Sort config
*/
- List<StorageHiveSortInfo> selectHiveSortInfoByIdentifier(@Param("groupId") String groupId,
+ List<StorageHiveDTO> selectAllHiveConfig(@Param("groupId") String groupId,
@Param("streamId") String streamId);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml
index 322ef11..4558507 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessExtEntityMapper.xml
@@ -120,34 +120,7 @@
<foreach collection="extList" separator="," index="index" item="item">
(#{item.id}, #{item.inlongGroupId}, #{item.keyName}, #{item.keyValue}, #{item.isDeleted})
</foreach>
- ON DUPLICATE KEY UPDATE
- id = values(id),
- inlong_group_id = values(inlong_group_id),
- key_name = values(key_name),
- key_value = values(key_value),
- is_deleted = values(is_deleted)
</insert>
- <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.BusinessExtEntity">
- update business_ext
- <set>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="keyName != null">
- key_name = #{keyName,jdbcType=VARCHAR},
- </if>
- <if test="keyValue != null">
- key_value = #{keyValue,jdbcType=VARCHAR},
- </if>
- <if test="isDeleted != null">
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- </if>
- <if test="modifyTime != null">
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- </set>
- where id = #{id,jdbcType=INTEGER}
- </update>
<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.BusinessExtEntity">
update business_ext
set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
index 52c3178..eb40ef9 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
@@ -24,6 +24,7 @@
<id column="id" jdbcType="INTEGER" property="id"/>
<result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
<result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+ <result column="enable_create_table" jdbcType="TINYINT" property="enableCreateTable"/>
<result column="jdbc_url" jdbcType="VARCHAR" property="jdbcUrl"/>
<result column="username" jdbcType="VARCHAR" property="username"/>
<result column="password" jdbcType="VARCHAR" property="password"/>
@@ -51,14 +52,13 @@
<result column="modifier" jdbcType="VARCHAR" property="modifier"/>
<result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
- <result column="temp_view" jdbcType="LONGVARCHAR" property="tempView"/>
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, inlong_stream_id, jdbc_url, username, password, db_name, table_name,
+ id, inlong_group_id, inlong_stream_id, enable_create_table, jdbc_url, username, password, db_name, table_name,
hdfs_default_fs, warehouse_dir, partition_interval, partition_unit, primary_partition, secondary_partition,
- partition_creation_strategy, file_format, data_encoding, data_separator, storage_period, opt_log,
- status, previous_status, is_deleted, creator, modifier, create_time, modify_time, temp_view
+ partition_creation_strategy, file_format, data_encoding, data_separator, storage_period,
+ opt_log, status, previous_status, is_deleted, creator, modifier, create_time, modify_time
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -141,9 +141,8 @@
and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</select>
- <select id="selectHiveSortInfoByIdentifier"
- resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo">
- SELECT hive.id,
+ <select id="selectAllHiveConfig" resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO">
+ select hive.id,
hive.inlong_group_id,
hive.inlong_stream_id,
@@ -173,7 +172,7 @@
stream.description,
stream.data_separator as sourceSeparator,
stream.data_escape_char
- FROM data_stream stream,
+ from data_stream stream,
storage_hive hive
<where>
stream.is_deleted = 0
@@ -195,28 +194,28 @@
<insert id="insert" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
- insert into storage_hive (id, inlong_group_id, inlong_stream_id,
+ insert into storage_hive (id, inlong_group_id,
+ inlong_stream_id, enable_create_table,
jdbc_url, username, password,
db_name, table_name, hdfs_default_fs,
warehouse_dir, partition_interval,
partition_unit, primary_partition,
secondary_partition, partition_creation_strategy,
file_format, data_encoding, data_separator,
- storage_period, opt_log,
- status, previous_status, is_deleted,
- creator, modifier, create_time,
- modify_time, temp_view)
- values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
+ storage_period, opt_log, status,
+ previous_status, is_deleted, creator,
+ modifier, create_time, modify_time)
+ values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR},
+ #{inlongStreamId,jdbcType=VARCHAR}, #{enableCreateTable,jdbcType=TINYINT},
#{jdbcUrl,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR},
#{dbName,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{hdfsDefaultFs,jdbcType=VARCHAR},
#{warehouseDir,jdbcType=VARCHAR}, #{partitionInterval,jdbcType=INTEGER},
#{partitionUnit,jdbcType=VARCHAR}, #{primaryPartition,jdbcType=VARCHAR},
#{secondaryPartition,jdbcType=VARCHAR}, #{partitionCreationStrategy,jdbcType=VARCHAR},
#{fileFormat,jdbcType=VARCHAR}, #{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
- #{storagePeriod,jdbcType=INTEGER}, #{optLog,jdbcType=VARCHAR},
- #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
- #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP},
- #{modifyTime,jdbcType=TIMESTAMP}, #{tempView,jdbcType=LONGVARCHAR})
+ #{storagePeriod,jdbcType=INTEGER}, #{optLog,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
+ #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR},
+ #{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP})
</insert>
<insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
@@ -231,6 +230,9 @@
<if test="inlongStreamId != null">
inlong_stream_id,
</if>
+ <if test="enableCreateTable != null">
+ enable_create_table,
+ </if>
<if test="jdbcUrl != null">
jdbc_url,
</if>
@@ -303,9 +305,6 @@
<if test="modifyTime != null">
modify_time,
</if>
- <if test="tempView != null">
- temp_view,
- </if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
@@ -317,6 +316,9 @@
<if test="inlongStreamId != null">
#{inlongStreamId,jdbcType=VARCHAR},
</if>
+ <if test="enableCreateTable != null">
+ #{enableCreateTable,jdbcType=TINYINT},
+ </if>
<if test="jdbcUrl != null">
#{jdbcUrl,jdbcType=VARCHAR},
</if>
@@ -389,9 +391,6 @@
<if test="modifyTime != null">
#{modifyTime,jdbcType=TIMESTAMP},
</if>
- <if test="tempView != null">
- #{tempView,jdbcType=LONGVARCHAR},
- </if>
</trim>
</insert>
@@ -404,6 +403,9 @@
<if test="inlongStreamId != null">
inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
</if>
+ <if test="enableCreateTable != null">
+ enable_create_table = #{enableCreateTable,jdbcType=TINYINT},
+ </if>
<if test="jdbcUrl != null">
jdbc_url = #{jdbcUrl,jdbcType=VARCHAR},
</if>
@@ -476,9 +478,6 @@
<if test="modifyTime != null">
modify_time = #{modifyTime,jdbcType=TIMESTAMP},
</if>
- <if test="tempView != null">
- temp_view = #{tempView,jdbcType=LONGVARCHAR},
- </if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
@@ -486,6 +485,7 @@
update storage_hive
set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ enable_create_table = #{enableCreateTable,jdbcType=TINYINT},
jdbc_url = #{jdbcUrl,jdbcType=VARCHAR},
username = #{username,jdbcType=VARCHAR},
password = #{password,jdbcType=VARCHAR},
@@ -509,8 +509,7 @@
creator = #{creator,jdbcType=VARCHAR},
modifier = #{modifier,jdbcType=VARCHAR},
create_time = #{createTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- temp_view = #{tempView,jdbcType=LONGVARCHAR}
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP}
where id = #{id,jdbcType=INTEGER}
</update>
<update id="updateStorageStatusById" parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
index a37e3d7..7f61d58 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/BusinessService.java
@@ -18,8 +18,10 @@
package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
+import java.util.List;
import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessCountVO;
+import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessListVO;
import org.apache.inlong.manager.common.pojo.business.BusinessPageRequest;
@@ -117,4 +119,13 @@ public interface BusinessService {
*/
boolean updateAfterApprove(BusinessApproveInfo approveInfo, String operator);
+ /**
+ * Save or update extended information
+ * <p/>First physically delete the existing extended information, and then add this batch of extended information
+ *
+ * @param groupId Group id
+ * @param infoList Ext info list
+ */
+ void saveOrUpdateExt(String groupId, List<BusinessExtInfo> infoList);
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index 1221742..8070d0f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -25,8 +25,9 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
-import org.apache.inlong.manager.common.pojo.datastream.FullPageInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamRequest;
import org.apache.inlong.manager.common.pojo.datastream.FullPageUpdateInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamResponse;
/**
* data stream service layer interface
@@ -109,22 +110,22 @@ public interface DataStreamService {
/**
* Save all information related to the data stream, its data source, and data storage
*
- * @param fullPageInfo All information on the page
+ * @param fullStreamRequest All information on the page
* @param operator Edit person's name
* @return Whether the save was successful
*/
- boolean saveAll(FullPageInfo fullPageInfo, String operator);
+ boolean saveAll(FullStreamRequest fullStreamRequest, String operator);
/**
* Save data streams, their data sources, and all information related to data storage in batches
*
- * @param fullPageInfoList List of data stream page information
+ * @param fullStreamRequestList List of data stream page information
* @param operator Edit person's name
* @return Whether the save was successful
* @apiNote This interface is only used when creating a new business. To ensure data consistency,
* all associated data needs to be physically deleted, and then added
*/
- boolean batchSaveAll(List<FullPageInfo> fullPageInfoList, String operator);
+ boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator);
/**
* Paging query all data of the data stream page under the specified groupId
@@ -132,7 +133,7 @@ public interface DataStreamService {
* @param request Query
* @return Paging list of all data on the data stream page
*/
- PageInfo<FullPageInfo> listAllWithGroupId(DataStreamPageRequest request);
+ PageInfo<FullStreamResponse> listAllWithGroupId(DataStreamPageRequest request);
/**
* Modify all data streams (including basic information about data sources)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java
index 4ec09bb..6b13df9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/StorageService.java
@@ -19,8 +19,9 @@ package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
import java.util.List;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StorageApproveInfo;
import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
@@ -37,7 +38,7 @@ public interface StorageService {
* @param operator Edit person's name
* @return Primary key after saving
*/
- Integer save(BaseStorageInfo storageInfo, String operator);
+ Integer save(BaseStorageRequest storageInfo, String operator);
/**
* Query storage information based on id
@@ -46,7 +47,7 @@ public interface StorageService {
* @param storageType Storage type
* @return Store information
*/
- BaseStorageInfo getById(String storageType, Integer id);
+ BaseStorageResponse getById(String storageType, Integer id);
/**
* Query storage information based on business group id and data stream id
@@ -56,7 +57,7 @@ public interface StorageService {
* @return Store information list
* @apiNote Storage types only support temporarily: HIVE
*/
- List<BaseStorageInfo> listByIdentifier(String groupId, String streamId);
+ List<BaseStorageResponse> listByIdentifier(String groupId, String streamId);
/**
* Query stored summary information based on business group id and data stream id, including storage cluster
@@ -75,7 +76,7 @@ public interface StorageService {
* @param streamId Data stream id
* @return Number of stored information
*/
- int getCountByIdentifier(String groupId, String streamId);
+ Integer getCountByIdentifier(String groupId, String streamId);
/**
* Paging query storage information based on conditions
@@ -83,16 +84,16 @@ public interface StorageService {
* @param request Paging request
* @return Store information list
*/
- PageInfo<? extends BaseStorageListVO> listByCondition(StoragePageRequest request);
+ PageInfo<? extends BaseStorageListResponse> listByCondition(StoragePageRequest request);
/**
* Modify data storage information
*
- * @param storageInfo Information that needs to be modified
+ * @param storageRequest Information that needs to be modified
* @param operator Edit person's name
* @return whether succeed
*/
- boolean update(BaseStorageInfo storageInfo, String operator);
+ boolean update(BaseStorageRequest storageRequest, String operator);
/**
* Delete data storage information based on id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 074c9df..573d76d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -104,7 +104,7 @@ public class BusinessServiceImpl implements BusinessService {
entity.setModifier(operator);
entity.setCreateTime(new Date());
businessMapper.insertSelective(entity);
- this.saveExt(groupId, businessInfo.getExtList());
+ this.saveOrUpdateExt(groupId, businessInfo.getExtList());
if (BizConstant.MIDDLEWARE_PULSAR.equals(businessInfo.getMiddlewareType())) {
BusinessPulsarInfo pulsarInfo = (BusinessPulsarInfo) businessInfo.getMqExtInfo();
@@ -223,7 +223,7 @@ public class BusinessServiceImpl implements BusinessService {
businessMapper.updateByIdentifierSelective(entity);
// Save extended information
- this.updateExt(groupId, businessInfo.getExtList());
+ this.saveOrUpdateExt(groupId, businessInfo.getExtList());
// Update the Pulsar info
if (BizConstant.MIDDLEWARE_PULSAR.equals(businessInfo.getMiddlewareType())) {
@@ -420,28 +420,16 @@ public class BusinessServiceImpl implements BusinessService {
return true;
}
- /**
- * Update extended information
- * <p/>First physically delete the existing extended information, and then add this batch of extended information
- */
@Transactional(rollbackFor = Throwable.class)
- void updateExt(String groupId, List<BusinessExtInfo> extInfoList) {
- LOGGER.debug("begin to update business ext, groupId={}, ext={}", groupId, extInfoList);
- try {
- businessExtMapper.deleteAllByGroupId(groupId);
- saveExt(groupId, extInfoList);
- LOGGER.info("success to update business ext");
- } catch (Exception e) {
- LOGGER.error("failed to update business ext: ", e);
- throw new BusinessException(BizErrorCodeEnum.BUSINESS_SAVE_FAILED);
- }
- }
+ @Override
+ public void saveOrUpdateExt(String groupId, List<BusinessExtInfo> infoList) {
+ LOGGER.debug("begin to save or update business ext info, groupId={}, ext={}", groupId, infoList);
+ businessExtMapper.deleteAllByGroupId(groupId);
- @Transactional(rollbackFor = Throwable.class)
- void saveExt(String groupId, List<BusinessExtInfo> infoList) {
if (CollectionUtils.isEmpty(infoList)) {
return;
}
+
List<BusinessExtEntity> entityList = CommonBeanUtils.copyListProperties(infoList, BusinessExtEntity::new);
Date date = new Date();
for (BusinessExtEntity entity : entityList) {
@@ -449,6 +437,7 @@ public class BusinessServiceImpl implements BusinessService {
entity.setModifyTime(date);
}
businessExtMapper.insertAll(entityList);
+ LOGGER.info("success to save or update business ext for groupId={}", groupId);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index e81587d..9713e48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -36,7 +36,8 @@ import org.apache.inlong.manager.common.pojo.datasource.SourceDbBasicInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceDbDetailInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceFileBasicInfo;
import org.apache.inlong.manager.common.pojo.datasource.SourceFileDetailInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamExtInfo;
@@ -46,8 +47,9 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
-import org.apache.inlong.manager.common.pojo.datastream.FullPageInfo;
import org.apache.inlong.manager.common.pojo.datastream.FullPageUpdateInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamRequest;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamResponse;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.BusinessEntity;
@@ -330,7 +332,7 @@ public class DataStreamServiceImpl implements DataStreamService {
* According to groupId and streamId, query the number of associated undeleted data storage
*/
private boolean hasDataStorage(String groupId, String streamId) {
- int count = storageService.getCountByIdentifier(groupId, streamId);
+ Integer count = storageService.getCountByIdentifier(groupId, streamId);
return count > 0;
}
@@ -373,12 +375,12 @@ public class DataStreamServiceImpl implements DataStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public boolean saveAll(FullPageInfo fullPageInfo, String operator) {
+ public boolean saveAll(FullStreamRequest fullStreamRequest, String operator) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to save all stream page info: {}", fullPageInfo);
+ LOGGER.debug("begin to save all stream page info: {}", fullStreamRequest);
}
- Preconditions.checkNotNull(fullPageInfo, "fullPageInfo is empty");
- DataStreamInfo streamInfo = fullPageInfo.getStreamInfo();
+ Preconditions.checkNotNull(fullStreamRequest, "fullStreamRequest is empty");
+ DataStreamInfo streamInfo = fullStreamRequest.getStreamInfo();
Preconditions.checkNotNull(streamInfo, "data stream info is empty");
// Check whether it can be added: check by lower-level specific services
@@ -388,28 +390,28 @@ public class DataStreamServiceImpl implements DataStreamService {
this.save(streamInfo, operator);
// 2.1 Save file data source information
- if (fullPageInfo.getFileBasicInfo() != null) {
- sourceFileService.saveBasic(fullPageInfo.getFileBasicInfo(), operator);
+ if (fullStreamRequest.getFileBasicInfo() != null) {
+ sourceFileService.saveBasic(fullStreamRequest.getFileBasicInfo(), operator);
}
- if (CollectionUtils.isNotEmpty(fullPageInfo.getFileDetailInfoList())) {
- for (SourceFileDetailInfo detailInfo : fullPageInfo.getFileDetailInfoList()) {
+ if (CollectionUtils.isNotEmpty(fullStreamRequest.getFileDetailInfoList())) {
+ for (SourceFileDetailInfo detailInfo : fullStreamRequest.getFileDetailInfoList()) {
sourceFileService.saveDetail(detailInfo, operator);
}
}
// 2.2 Save DB data source information
- if (fullPageInfo.getDbBasicInfo() != null) {
- sourceDbService.saveBasic(fullPageInfo.getDbBasicInfo(), operator);
+ if (fullStreamRequest.getDbBasicInfo() != null) {
+ sourceDbService.saveBasic(fullStreamRequest.getDbBasicInfo(), operator);
}
- if (CollectionUtils.isNotEmpty(fullPageInfo.getDbDetailInfoList())) {
- for (SourceDbDetailInfo detailInfo : fullPageInfo.getDbDetailInfoList()) {
+ if (CollectionUtils.isNotEmpty(fullStreamRequest.getDbDetailInfoList())) {
+ for (SourceDbDetailInfo detailInfo : fullStreamRequest.getDbDetailInfoList()) {
sourceDbService.saveDetail(detailInfo, operator);
}
}
// 3. Save data storage information
- if (CollectionUtils.isNotEmpty(fullPageInfo.getStorageInfo())) {
- for (BaseStorageInfo storageInfo : fullPageInfo.getStorageInfo()) {
+ if (CollectionUtils.isNotEmpty(fullStreamRequest.getStorageInfo())) {
+ for (BaseStorageRequest storageInfo : fullStreamRequest.getStorageInfo()) {
storageService.save(storageInfo, operator);
}
}
@@ -420,14 +422,14 @@ public class DataStreamServiceImpl implements DataStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public boolean batchSaveAll(List<FullPageInfo> fullPageInfoList, String operator) {
- if (CollectionUtils.isEmpty(fullPageInfoList)) {
+ public boolean batchSaveAll(List<FullStreamRequest> fullStreamRequestList, String operator) {
+ if (CollectionUtils.isEmpty(fullStreamRequestList)) {
return true;
}
- LOGGER.info("begin to batch save all stream page info, batch size={}", fullPageInfoList.size());
+ LOGGER.info("begin to batch save all stream page info, batch size={}", fullStreamRequestList.size());
// Check if it can be added
- DataStreamInfo firstStream = fullPageInfoList.get(0).getStreamInfo();
+ DataStreamInfo firstStream = fullStreamRequestList.get(0).getStreamInfo();
Preconditions.checkNotNull(firstStream, "data stream info is empty");
String groupId = firstStream.getInlongGroupId();
this.checkBizIsTempStatus(groupId);
@@ -438,7 +440,7 @@ public class DataStreamServiceImpl implements DataStreamService {
// and the ones with is_deleted=0 should be deleted
streamMapper.deleteAllByGroupId(groupId);
- for (FullPageInfo pageInfo : fullPageInfoList) {
+ for (FullStreamRequest pageInfo : fullStreamRequestList) {
// 1.1 Delete the data stream extensions and fields corresponding to groupId and streamId
DataStreamInfo streamInfo = pageInfo.getStreamInfo();
String streamId = streamInfo.getInlongStreamId();
@@ -461,7 +463,7 @@ public class DataStreamServiceImpl implements DataStreamService {
}
@Override
- public PageInfo<FullPageInfo> listAllWithGroupId(DataStreamPageRequest request) {
+ public PageInfo<FullStreamResponse> listAllWithGroupId(DataStreamPageRequest request) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("begin to list full data stream page by {}", request);
}
@@ -483,14 +485,14 @@ public class DataStreamServiceImpl implements DataStreamService {
List<DataStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, DataStreamInfo::new);
// Convert and encapsulate the paged results
- List<FullPageInfo> fullPageInfoList = new ArrayList<>(streamInfoList.size());
+ List<FullStreamResponse> responseList = new ArrayList<>(streamInfoList.size());
for (DataStreamInfo streamInfo : streamInfoList) {
// 2.1 Set the extended information and field information of the data stream
String streamId = streamInfo.getInlongStreamId();
setStreamExtAndField(groupId, streamId, streamInfo);
// 2.3 Set the data stream to the result sub-object
- FullPageInfo pageInfo = new FullPageInfo();
+ FullStreamResponse pageInfo = new FullStreamResponse();
pageInfo.setStreamInfo(streamInfo);
// 3. Query the basic and detailed information of the data source
@@ -520,14 +522,14 @@ public class DataStreamServiceImpl implements DataStreamService {
}
// 4. Query various data storage and its extended information, field information
- List<BaseStorageInfo> storageInfoList = storageService.listByIdentifier(groupId, streamId);
+ List<BaseStorageResponse> storageInfoList = storageService.listByIdentifier(groupId, streamId);
pageInfo.setStorageInfo(storageInfoList);
// 5. Add a single result to the paginated list
- fullPageInfoList.add(pageInfo);
+ responseList.add(pageInfo);
}
- PageInfo<FullPageInfo> pageInfo = new PageInfo<>(fullPageInfoList);
+ PageInfo<FullStreamResponse> pageInfo = new PageInfo<>(responseList);
pageInfo.setTotal(pageInfo.getTotal());
LOGGER.info("success to list full data stream info");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
index 12ee4bb..cada803 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SourceFileServiceImpl.java
@@ -229,7 +229,7 @@ public class SourceFileServiceImpl implements SourceFileService {
List<SourceFileDetailEntity> entities = fileDetailMapper.selectByIdentifier(groupId, streamId);
if (CollectionUtils.isEmpty(entities)) {
- LOGGER.error("file data source detail not found");
+ LOGGER.warn("file data source detail not found");
// throw new BusinessException(BizErrorCodeEnum.DATA_SOURCE_DETAIL_NOTFOUND);
return Collections.emptyList();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
index 1a8fb24..e5cc1bd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
@@ -29,11 +29,13 @@ import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StorageExtInfo;
import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveFieldInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -71,10 +73,10 @@ public class StorageHiveOperation extends StorageBaseOperation {
/**
* Save HIVE storage information
*
- * @param storageInfo Storage i formation
+ * @param storageInfo Storage information
* @return Id after saving
*/
- public int saveHiveStorage(BaseStorageInfo storageInfo, String operator) {
+ public int saveHiveStorage(BaseStorageRequest storageInfo, String operator) {
String groupId = storageInfo.getInlongGroupId();
// Make sure that there is no HIVE storage information under the current groupId and streamId
// (the two are mutually exclusive, only one can exist)
@@ -82,11 +84,14 @@ public class StorageHiveOperation extends StorageBaseOperation {
.selectByIdentifier(groupId, storageInfo.getInlongStreamId());
Preconditions.checkEmpty(storageExist, "HIVE storage already exist under the groupId and streamId");
- StorageHiveInfo hiveInfo = (StorageHiveInfo) storageInfo;
+ StorageHiveRequest hiveInfo = (StorageHiveRequest) storageInfo;
StorageHiveEntity entity = CommonBeanUtils.copyProperties(hiveInfo, StorageHiveEntity::new);
// Set the encoding type and field splitter
DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, entity.getInlongStreamId());
+ if (streamEntity == null) {
+ throw new BusinessException(BizErrorCodeEnum.DATA_STREAM_NOT_FOUND);
+ }
String dataEncoding = streamEntity.getDataEncoding() == null
? StandardCharsets.UTF_8.displayName() : streamEntity.getDataEncoding();
entity.setDataEncoding(dataEncoding);
@@ -97,7 +102,9 @@ public class StorageHiveOperation extends StorageBaseOperation {
entity.setStatus(EntityStatus.DATA_STORAGE_NEW.getCode());
entity.setCreator(operator);
entity.setModifier(operator);
- entity.setCreateTime(new Date());
+ Date now = new Date();
+ entity.setCreateTime(now);
+ entity.setCreateTime(now);
hiveStorageMapper.insertSelective(entity);
int id = entity.getId();
@@ -113,7 +120,7 @@ public class StorageHiveOperation extends StorageBaseOperation {
/**
* According to groupId and streamId, query the HIVE storage information to which it belongs
*/
- public void setHiveStorageInfo(String groupId, String streamId, List<BaseStorageInfo> storageInfoList) {
+ public void setHiveStorageResponse(String groupId, String streamId, List<BaseStorageResponse> requestList) {
List<StorageHiveEntity> hiveEntities = hiveStorageMapper.selectByIdentifier(groupId, streamId);
if (CollectionUtils.isEmpty(hiveEntities)) {
@@ -134,11 +141,11 @@ public class StorageHiveOperation extends StorageBaseOperation {
List<StorageHiveFieldInfo> fieldInfoList = CommonBeanUtils
.copyListProperties(fieldEntityList, StorageHiveFieldInfo::new);
- StorageHiveInfo hiveInfo = CommonBeanUtils.copyProperties(hiveEntity, StorageHiveInfo::new);
+ StorageHiveResponse hiveInfo = CommonBeanUtils.copyProperties(hiveEntity, StorageHiveResponse::new);
hiveInfo.setStorageType(storageType);
hiveInfo.setExtList(CommonBeanUtils.copyListProperties(extEntities, StorageExtInfo::new));
hiveInfo.setHiveFieldList(fieldInfoList);
- storageInfoList.add(hiveInfo);
+ requestList.add(hiveInfo);
}
}
@@ -224,24 +231,24 @@ public class StorageHiveOperation extends StorageBaseOperation {
* @param id Storage ID
* @return Storage information
*/
- public BaseStorageInfo getHiveStorage(Integer id) {
+ public BaseStorageResponse getHiveStorage(Integer id) {
StorageHiveEntity entity = hiveStorageMapper.selectByPrimaryKey(id);
if (entity == null) {
LOGGER.error("hive storage not found by id={}", id);
return null;
}
- StorageHiveInfo hiveInfo = CommonBeanUtils.copyProperties(entity, StorageHiveInfo::new);
+ StorageHiveResponse response = CommonBeanUtils.copyProperties(entity, StorageHiveResponse::new);
String storageType = BizConstant.STORAGE_HIVE;
List<StorageExtEntity> extEntityList = storageExtMapper.selectByStorageTypeAndId(storageType, id);
List<StorageExtInfo> extInfoList = CommonBeanUtils.copyListProperties(extEntityList, StorageExtInfo::new);
- hiveInfo.setExtList(extInfoList);
+ response.setExtList(extInfoList);
List<StorageHiveFieldEntity> entities = hiveFieldMapper.selectByStorageId(id);
List<StorageHiveFieldInfo> infos = CommonBeanUtils.copyListProperties(entities, StorageHiveFieldInfo::new);
- hiveInfo.setHiveFieldList(infos);
+ response.setHiveFieldList(infos);
- return hiveInfo;
+ return response;
}
/**
@@ -250,15 +257,16 @@ public class StorageHiveOperation extends StorageBaseOperation {
* @param request Query conditions
* @return Store the paged results of the list
*/
- public PageInfo<StorageHiveListVO> getHiveStorageList(StoragePageRequest request) {
+ public PageInfo<StorageHiveListResponse> getHiveStorageList(StoragePageRequest request) {
LOGGER.info("begin to list hive storage page by {}", request);
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<StorageHiveEntity> entityPage = (Page<StorageHiveEntity>) hiveStorageMapper.selectByCondition(request);
- List<StorageHiveListVO> detailList = CommonBeanUtils.copyListProperties(entityPage, StorageHiveListVO::new);
+ List<StorageHiveListResponse> detailList = CommonBeanUtils.copyListProperties(entityPage,
+ StorageHiveListResponse::new);
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
- PageInfo<StorageHiveListVO> page = new PageInfo<>(detailList);
+ PageInfo<StorageHiveListResponse> page = new PageInfo<>(detailList);
page.setTotal(entityPage.getTotal());
LOGGER.info("success to list hive storage");
@@ -273,8 +281,8 @@ public class StorageHiveOperation extends StorageBaseOperation {
* @param operator Operator
* @return Updated id
*/
- public Integer updateHiveStorage(Integer bizStatus, BaseStorageInfo storageInfo, String operator) {
- StorageHiveInfo hiveInfo = (StorageHiveInfo) storageInfo;
+ public Integer updateHiveStorage(Integer bizStatus, BaseStorageRequest storageInfo, String operator) {
+ StorageHiveRequest hiveInfo = (StorageHiveRequest) storageInfo;
// id exists, update, otherwise add
Integer id = hiveInfo.getId();
if (id != null) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
index 3464294..fca8ce2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
@@ -27,8 +27,9 @@ import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StorageApproveInfo;
import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
@@ -59,7 +60,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
@Transactional(rollbackFor = Throwable.class)
@Override
- public Integer save(BaseStorageInfo storageInfo, String operator) {
+ public Integer save(BaseStorageRequest storageInfo, String operator) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("begin to save storage info={}", storageInfo);
}
@@ -96,12 +97,12 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
}
@Override
- public BaseStorageInfo getById(String storageType, Integer id) {
+ public BaseStorageResponse getById(String storageType, Integer id) {
LOGGER.debug("begin to get storage by storageType={}, id={}", storageType, id);
Preconditions.checkNotNull(id, "storage id is null");
Preconditions.checkNotNull(storageType, "storageType is empty");
- BaseStorageInfo storageInfo;
+ BaseStorageResponse storageInfo;
if (BizConstant.STORAGE_HIVE.equals(storageType.toUpperCase(Locale.ROOT))) {
storageInfo = hiveOperation.getHiveStorage(id);
} else {
@@ -114,28 +115,28 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
}
@Override
- public int getCountByIdentifier(String groupId, String streamId) {
+ public Integer getCountByIdentifier(String groupId, String streamId) {
LOGGER.debug("begin to get storage count by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, BizConstant.STREAM_ID_IS_EMPTY);
- int count = hiveStorageMapper.selectCountByIdentifier(groupId, streamId);
+ Integer count = hiveStorageMapper.selectCountByIdentifier(groupId, streamId);
LOGGER.info("the storage count={} by groupId={}, streamId={}", count, groupId, streamId);
return count;
}
@Override
- public List<BaseStorageInfo> listByIdentifier(String groupId, String streamId) {
+ public List<BaseStorageResponse> listByIdentifier(String groupId, String streamId) {
LOGGER.debug("begin to list storage by groupId={}, streamId={}", groupId, streamId);
Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
// Query HDFS, HIVE, ES storage information and encapsulate it in the result set
- List<BaseStorageInfo> storageInfoList = new ArrayList<>();
- hiveOperation.setHiveStorageInfo(groupId, streamId, storageInfoList);
+ List<BaseStorageResponse> responseList = new ArrayList<>();
+ hiveOperation.setHiveStorageResponse(groupId, streamId, responseList);
LOGGER.info("success to list storage info");
- return storageInfoList;
+ return responseList;
}
@Override
@@ -155,7 +156,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
}
@Override
- public PageInfo<? extends BaseStorageListVO> listByCondition(StoragePageRequest request) {
+ public PageInfo<? extends BaseStorageListResponse> listByCondition(StoragePageRequest request) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("begin to list storage page by {}", request);
}
@@ -164,7 +165,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
String storageType = request.getStorageType();
Preconditions.checkNotNull(storageType, "storageType is empty");
- PageInfo<? extends BaseStorageListVO> page;
+ PageInfo<? extends BaseStorageListResponse> page;
if (BizConstant.STORAGE_HIVE.equals(storageType.toUpperCase(Locale.ROOT))) {
page = hiveOperation.getHiveStorageList(request);
} else {
@@ -178,25 +179,25 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
@Transactional(rollbackFor = Throwable.class)
@Override
- public boolean update(BaseStorageInfo storageInfo, String operator) {
+ public boolean update(BaseStorageRequest storageRequest, String operator) {
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("begin to update storage info={}", storageInfo);
+ LOGGER.debug("begin to update storage info={}", storageRequest);
}
- Preconditions.checkNotNull(storageInfo, "storage info is empty");
- String groupId = storageInfo.getInlongGroupId();
+ Preconditions.checkNotNull(storageRequest, "storage info is empty");
+ String groupId = storageRequest.getInlongGroupId();
Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
- String streamId = storageInfo.getInlongStreamId();
+ String streamId = storageRequest.getInlongStreamId();
Preconditions.checkNotNull(streamId, BizConstant.STREAM_ID_IS_EMPTY);
// Check if it can be modified
BusinessEntity businessEntity = super.checkBizIsTempStatus(groupId);
- String storageType = storageInfo.getStorageType();
+ String storageType = storageRequest.getStorageType();
Preconditions.checkNotNull(storageType, "storageType is empty");
if (BizConstant.STORAGE_HIVE.equals(storageType.toUpperCase(Locale.ROOT))) {
- hiveOperation.updateHiveStorage(businessEntity.getStatus(), storageInfo, operator);
+ hiveOperation.updateHiveStorage(businessEntity.getStatus(), storageRequest, operator);
} else {
LOGGER.error("the storageType={} not support", storageType);
throw new BusinessException(BizErrorCodeEnum.STORAGE_TYPE_NOT_SUPPORTED);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
index 78bb270..1975c20 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.thirdpart.hive;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
-import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.common.event.ListenerResult;
import org.apache.inlong.manager.common.event.task.TaskEvent;
import org.apache.inlong.manager.common.event.task.TaskEventListener;
import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -53,16 +53,12 @@ public class CreateHiveTableForStreamListener implements TaskEventListener {
String streamId = form.getInlongStreamId();
log.info("begin create hive table for groupId={}, streamId={}", groupId, streamId);
- List<StorageHiveSortInfo> hiveConfig = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
- if (hiveConfig == null || hiveConfig.size() == 0) {
- return ListenerResult.success();
- }
- for (StorageHiveSortInfo info : hiveConfig) {
- hiveTableOperator.createHiveTable(groupId, info);
- }
+ List<StorageHiveDTO> configList = hiveEntityMapper.selectAllHiveConfig(groupId, streamId);
+ hiveTableOperator.createHiveResource(groupId, configList);
- log.info("finish create hive table for business {} - {} ", groupId, streamId);
- return ListenerResult.success();
+ String result = "success to create hive table for group [" + groupId + "], stream [" + streamId + "]";
+ log.info(result);
+ return ListenerResult.success(result);
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
index 032bef3..45ac9e0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
@@ -19,13 +19,13 @@ package org.apache.inlong.manager.service.thirdpart.hive;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
-import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.common.event.ListenerResult;
import org.apache.inlong.manager.common.event.task.TaskEvent;
import org.apache.inlong.manager.common.event.task.TaskEventListener;
import org.apache.inlong.manager.common.model.WorkflowContext;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -52,16 +52,12 @@ public class CreateHiveTableListener implements TaskEventListener {
String groupId = form.getInlongGroupId();
log.info("begin to create hive table for groupId={}", groupId);
- List<StorageHiveSortInfo> configList = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, null);
- if (configList == null || configList.size() == 0) {
- return ListenerResult.success();
- }
+ List<StorageHiveDTO> configList = hiveEntityMapper.selectAllHiveConfig(groupId, null);
+ hiveTableOperator.createHiveResource(groupId, configList);
- for (StorageHiveSortInfo hiveConfig : configList) {
- hiveTableOperator.createHiveTable(groupId, hiveConfig);
- log.info("finish to create hive table for business {}", groupId);
- }
- return ListenerResult.success();
+ String result = "success to create hive table for group [" + groupId + "]";
+ log.info(result);
+ return ListenerResult.success(result);
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
index 60c0acc..d7a6f16 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
@@ -21,9 +21,11 @@ import static java.util.stream.Collectors.toList;
import java.util.ArrayList;
import java.util.List;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.EntityStatus;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
import org.apache.inlong.manager.common.pojo.query.DatabaseQueryBean;
import org.apache.inlong.manager.common.pojo.query.hive.HiveColumnQueryBean;
@@ -32,34 +34,53 @@ import org.apache.inlong.manager.dao.entity.StorageHiveFieldEntity;
import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
import org.apache.inlong.manager.service.core.DataSourceService;
import org.apache.inlong.manager.service.core.StorageService;
-import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.service.core.impl.StorageHiveOperation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Create hive table operation
*/
-@Slf4j
@Component
public class HiveTableOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StorageHiveOperation.class);
+
@Autowired
private StorageService storageService;
@Autowired
- private DataSourceService<DatabaseQueryBean, HiveTableQueryBean> dataSourceService;
-
- @Autowired
private StorageHiveFieldEntityMapper hiveFieldMapper;
+ @Autowired
+ private DataSourceService<DatabaseQueryBean, HiveTableQueryBean> dataSourceService;
/**
* Create hive table according to the groupId and hive config
*/
- public void createHiveTable(String groupId, StorageHiveSortInfo hiveConfig) {
- if (log.isDebugEnabled()) {
- log.debug("begin create hive table for business={}, hiveConfig={}", groupId, hiveConfig);
+ public void createHiveResource(String groupId, List<StorageHiveDTO> configList) {
+ if (CollectionUtils.isEmpty(configList)) {
+ LOGGER.warn("no hive config, skip to create");
+ return;
+ }
+ for (StorageHiveDTO config : configList) {
+ if (EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode().equals(config.getStatus())) {
+ LOGGER.warn("hive [" + config.getId() + "] already success, skip to create");
+ continue;
+ } else if (BizConstant.DISABLE_CREATE_TABLE.equals(config.getEnableCreateTable())) {
+ LOGGER.warn("create table was disable, skip to create table for hive [" + config.getId() + "]");
+ continue;
+ }
+ this.createTable(groupId, config);
}
+ }
- HiveTableQueryBean tableBean = getTableQueryBean(hiveConfig);
+ private void createTable(String groupId, StorageHiveDTO config) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("begin create hive table for business={}, config={}", groupId, config);
+ }
+
+ HiveTableQueryBean tableBean = getTableQueryBean(config);
try {
// create database if not exists
dataSourceService.createDb(tableBean);
@@ -78,22 +99,22 @@ public class HiveTableOperator {
dataSourceService.createColumn(tableBean);
}
}
- storageService.updateHiveStatusById(hiveConfig.getId(),
+ storageService.updateHiveStatusById(config.getId(),
EntityStatus.DATA_STORAGE_CONFIG_SUCCESSFUL.getCode(), "create hive table success");
} catch (Throwable e) {
- log.error("create hive table error, ", e);
- storageService.updateHiveStatusById(hiveConfig.getId(),
+ LOGGER.error("create hive table error, ", e);
+ storageService.updateHiveStatusById(config.getId(),
EntityStatus.DATA_STORAGE_CONFIG_FAILED.getCode(), e.getMessage());
throw new WorkflowException("create hive table failed, reason: " + e.getMessage());
}
- log.info("finish create hive table for business {} ", groupId);
+ LOGGER.info("success create hive table for data group [" + groupId + "]");
}
- protected HiveTableQueryBean getTableQueryBean(StorageHiveSortInfo hiveConfig) {
- String groupId = hiveConfig.getInlongGroupId();
- String streamId = hiveConfig.getInlongStreamId();
- log.info("begin to get table query bean for groupId={}, streamId={}", groupId, streamId);
+ protected HiveTableQueryBean getTableQueryBean(StorageHiveDTO config) {
+ String groupId = config.getInlongGroupId();
+ String streamId = config.getInlongStreamId();
+ LOGGER.info("begin to get table query bean for groupId={}, streamId={}", groupId, streamId);
List<StorageHiveFieldEntity> fieldEntities = hiveFieldMapper.selectHiveFields(groupId, streamId);
@@ -107,31 +128,33 @@ public class HiveTableOperator {
}
// set partition field and type
- String partitionField = hiveConfig.getPrimaryPartition();
+ String partitionField = config.getPrimaryPartition();
if (partitionField != null) {
HiveColumnQueryBean partColumn = new HiveColumnQueryBean();
partColumn.setPartition(true);
partColumn.setColumnName(partitionField);
- partColumn.setColumnType("string"); // currently, only supports 'string' type
+ // currently, only supports 'string' type
+ partColumn.setColumnType("string");
columnQueryBeans.add(partColumn);
}
HiveTableQueryBean queryBean = new HiveTableQueryBean();
queryBean.setColumns(columnQueryBeans);
// set terminated symbol
- if (hiveConfig.getTargetSeparator() != null) {
- char ch = (char) Integer.parseInt(hiveConfig.getTargetSeparator());
+ if (config.getTargetSeparator() != null) {
+ char ch = (char) Integer.parseInt(config.getTargetSeparator());
queryBean.setFieldTerSymbol(String.valueOf(ch));
}
- queryBean.setUsername(hiveConfig.getUsername());
- queryBean.setPassword(hiveConfig.getPassword());
- queryBean.setTableName(hiveConfig.getTableName());
- queryBean.setDbName(hiveConfig.getDbName());
- queryBean.setJdbcUrl(hiveConfig.getJdbcUrl());
-
- if (log.isDebugEnabled()) {
- log.debug("success to get table query bean={}", queryBean);
+ queryBean.setUsername(config.getUsername());
+ queryBean.setPassword(config.getPassword());
+ queryBean.setTableName(config.getTableName());
+ queryBean.setDbName(config.getDbName());
+ queryBean.setJdbcUrl(config.getJdbcUrl());
+
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("success to get table query bean={}", queryBean);
}
return queryBean;
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index 1263bc0..5b6f1a9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -30,7 +30,7 @@ import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveDTO;
import org.apache.inlong.manager.common.settings.BusinessSettings;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -112,8 +112,8 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
// if streamId not null, just push the config belongs to the groupId and the streamId
String streamId = form.getInlongStreamId();
- List<StorageHiveSortInfo> hiveInfoList = storageHiveMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
- for (StorageHiveSortInfo hiveInfo : hiveInfoList) {
+ List<StorageHiveDTO> hiveInfoList = storageHiveMapper.selectAllHiveConfig(groupId, streamId);
+ for (StorageHiveDTO hiveInfo : hiveInfoList) {
Integer storageId = hiveInfo.getId();
if (log.isDebugEnabled()) {
@@ -141,7 +141,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
return ListenerResult.success();
}
- private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveSortInfo hiveInfo) {
+ private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveDTO hiveInfo) {
String groupId = hiveInfo.getInlongGroupId();
String streamId = hiveInfo.getInlongStreamId();
List<StorageHiveFieldEntity> fieldList = hiveFieldMapper.selectHiveFields(groupId, streamId);
@@ -157,7 +157,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
return new DataFlowInfo(hiveInfo.getId(), sourceInfo, sinkInfo);
}
- private HiveSinkInfo getSinkInfo(StorageHiveSortInfo hiveInfo, List<StorageHiveFieldEntity> fieldList) {
+ private HiveSinkInfo getSinkInfo(StorageHiveDTO hiveInfo, List<StorageHiveFieldEntity> fieldList) {
if (hiveInfo.getJdbcUrl() == null) {
throw new WorkflowListenerException("hive server url cannot be empty");
}
@@ -221,7 +221,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
/**
* Get source info
*/
- private SourceInfo getSourceInfo(BusinessInfo businessInfo, StorageHiveSortInfo storageInfo,
+ private SourceInfo getSourceInfo(BusinessInfo businessInfo, StorageHiveDTO storageInfo,
List<StorageHiveFieldEntity> fieldList) {
DeserializationInfo deserializationInfo = null;
boolean isDbType = BizConstant.DATA_SOURCE_DB.equals(storageInfo.getDataSourceType());
@@ -306,7 +306,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
}
private PulsarSourceInfo createPulsarSourceInfo(BusinessInfo businessInfo,
- StorageHiveSortInfo storageInfo,
+ StorageHiveDTO storageInfo,
DeserializationInfo deserializationInfo,
List<FieldInfo> sourceFields) {
final String tenant = clusterBean.getDefaultTenant();
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImplTest.java
deleted file mode 100644
index 371af1c..0000000
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImplTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
-import org.apache.inlong.manager.dao.entity.BusinessExtEntity;
-import org.apache.inlong.manager.dao.mapper.BusinessExtEntityMapper;
-import org.apache.inlong.manager.service.core.BaseTest;
-import org.junit.Assert;
-import org.junit.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-
-public class BusinessServiceImplTest extends BaseTest {
-
- @Autowired
- BusinessServiceImpl businessService;
-
- @Autowired
- BusinessExtEntityMapper businessExtMapper;
-
- @Test
- public void testSaveAndUpdateExt() {
- final String groupId = "group1";
- //check insert
- BusinessExtInfo businessExtInfo1 = new BusinessExtInfo();
- businessExtInfo1.setId(1);
- businessExtInfo1.setInlongGroupId("group1");
- businessExtInfo1.setKeyName("pulsar_url");
- businessExtInfo1.setKeyValue("http://127.0.0.1:8080");
- BusinessExtInfo businessExtInfo2 = new BusinessExtInfo();
- businessExtInfo2.setId(2);
- businessExtInfo2.setInlongGroupId("group1");
- businessExtInfo2.setKeyName("pulsar_secret");
- businessExtInfo2.setKeyValue("QWEASDZXC");
- ArrayList<BusinessExtInfo> businessExtInfoList = Lists.newArrayList(businessExtInfo1, businessExtInfo2);
- businessService.saveExt(groupId, businessExtInfoList);
- List<BusinessExtEntity> extEntityList = businessExtMapper.selectByGroupId(groupId);
- Assert.assertTrue(extEntityList.size() == 2);
- Assert.assertTrue(extEntityList.get(0).getKeyName().equals("pulsar_url"));
- Assert.assertTrue(extEntityList.get(0).getKeyValue().equals("http://127.0.0.1:8080"));
- //check update
- businessExtInfo1.setKeyValue("http://127.0.0.1:8081");
- businessService.updateExt(groupId,businessExtInfoList);
- extEntityList = businessExtMapper.selectByGroupId(groupId);
- Assert.assertTrue(extEntityList.size() == 2);
- Assert.assertTrue(extEntityList.get(0).getKeyValue().equals("http://127.0.0.1:8081"));
- }
-
-}
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index e8e7152..1713247 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -242,7 +242,7 @@ DROP TABLE IF EXISTS `consumption`;
CREATE TABLE `consumption`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `consumer_group_name` varchar(255) DEFAULT NULL COMMENT 'consumer group name',
+ `consumer_group_name` varchar(255) DEFAULT NULL COMMENT 'consumer group name',
`consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
`in_charges` varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
`inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group id',
@@ -355,7 +355,7 @@ CREATE TABLE `data_stream`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Data stream id, non-deleted globally unique',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
- `name` varchar(64) DEFAULT NULL COMMENT 'The name of the data stream page display, can be Chinese',
+ `name` varchar(64) DEFAULT NULL COMMENT 'The name of the data stream page display, can be Chinese',
`description` varchar(256) DEFAULT '' COMMENT 'Introduction to data stream',
`mq_resource_obj` varchar(128) DEFAULT NULL COMMENT 'MQ resource object, in the data stream, Tube is data_stream_id, Pulsar is Topic',
`data_source_type` varchar(32) DEFAULT 'FILE' COMMENT 'Data source type, including: FILE, DB, Auto-Push (DATA_PROXY_SDK, HTTP)',
@@ -604,6 +604,7 @@ CREATE TABLE `storage_hive`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
+ `enable_create_table` tinyint(1) DEFAULT 1 COMMENT 'Whether to enable create table, 1: enable, 0: disable, default is 1',
`jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
`username` varchar(128) DEFAULT NULL COMMENT 'Username',
`password` varchar(255) DEFAULT NULL COMMENT 'User password',
@@ -1106,27 +1107,27 @@ CREATE TABLE `flume_sink_ext`
DROP TABLE IF EXISTS `db_collector_detail_task`;
CREATE TABLE `db_collector_detail_task`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `main_id` varchar(128) NOT NULL COMMENT 'main task id',
- `type` int(11) NOT NULL COMMENT 'task type',
- `time_var` varchar(64) NOT NULL COMMENT 'time variable',
- `db_type` int(11) NOT NULL COMMENT 'db type',
- `ip` varchar(64) NOT NULL COMMENT 'db ip',
- `port` int(11) NOT NULL COMMENT 'db port',
- `db_name` varchar(64) NULL COMMENT 'db name',
- `user` varchar(64) NULL COMMENT 'user name',
- `password` varchar(64) NULL COMMENT 'password',
- `sql_statement` varchar(256) NULL COMMENT 'sql statement',
- `offset` int(11) NOT NULL COMMENT 'offset for the data source',
- `total_limit` int(11) NOT NULL COMMENT 'total limit in a task',
- `once_limit` int(11) NOT NULL COMMENT 'limit for one query',
- `time_limit` int(11) NOT NULL COMMENT 'time limit for task',
- `retry_times` int(11) NOT NULL COMMENT 'max retry times if task failes',
- `group_id` varchar(64) NULL COMMENT 'group id',
- `stream_id` varchar(64) NULL COMMENT 'stream id',
- `state` int(11) NOT NULL COMMENT 'task state',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `main_id` varchar(128) NOT NULL COMMENT 'main task id',
+ `type` int(11) NOT NULL COMMENT 'task type',
+ `time_var` varchar(64) NOT NULL COMMENT 'time variable',
+ `db_type` int(11) NOT NULL COMMENT 'db type',
+ `ip` varchar(64) NOT NULL COMMENT 'db ip',
+ `port` int(11) NOT NULL COMMENT 'db port',
+ `db_name` varchar(64) NULL COMMENT 'db name',
+ `user` varchar(64) NULL COMMENT 'user name',
+ `password` varchar(64) NULL COMMENT 'password',
+ `sql_statement` varchar(256) NULL COMMENT 'sql statement',
+ `offset` int(11) NOT NULL COMMENT 'offset for the data source',
+ `total_limit` int(11) NOT NULL COMMENT 'total limit in a task',
+ `once_limit` int(11) NOT NULL COMMENT 'limit for one query',
+ `time_limit` int(11) NOT NULL COMMENT 'time limit for task',
+ `retry_times` int(11) NOT NULL COMMENT 'max retry times if task failes',
+ `group_id` varchar(64) NULL COMMENT 'group id',
+ `stream_id` varchar(64) NULL COMMENT 'stream id',
+ `state` int(11) NOT NULL COMMENT 'task state',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='db collector detail task table';
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java
index b63fc7e..6ae93ae 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataStreamController.java
@@ -29,8 +29,9 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
-import org.apache.inlong.manager.common.pojo.datastream.FullPageInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamRequest;
import org.apache.inlong.manager.common.pojo.datastream.FullPageUpdateInfo;
+import org.apache.inlong.manager.common.pojo.datastream.FullStreamResponse;
import org.apache.inlong.manager.common.util.LoginUserUtil;
import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.core.operationlog.OperationLog;
@@ -64,14 +65,14 @@ public class DataStreamController {
@RequestMapping(value = "/saveAll", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save data stream page information ,including source and storage")
- public Response<Boolean> saveAll(@RequestBody FullPageInfo pageInfo) {
+ public Response<Boolean> saveAll(@RequestBody FullStreamRequest pageInfo) {
return Response.success(dataStreamService.saveAll(pageInfo, LoginUserUtil.getLoginUserDetail().getUserName()));
}
@RequestMapping(value = "/batchSaveAll", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Batch save data stream page information ,including source and storage")
- public Response<Boolean> batchSaveAll(@RequestBody List<FullPageInfo> infoList) {
+ public Response<Boolean> batchSaveAll(@RequestBody List<FullStreamRequest> infoList) {
boolean result = dataStreamService.batchSaveAll(infoList, LoginUserUtil.getLoginUserDetail().getUserName());
return Response.success(result);
}
@@ -95,7 +96,7 @@ public class DataStreamController {
@RequestMapping(value = "/listAll", method = RequestMethod.GET)
@ApiOperation(value = "Paging query all data of the data stream page under the specified groupId")
- public Response<PageInfo<FullPageInfo>> listAllWithGroupId(DataStreamPageRequest request) {
+ public Response<PageInfo<FullStreamResponse>> listAllWithGroupId(DataStreamPageRequest request) {
request.setCurrentUser(LoginUserUtil.getLoginUserDetail().getUserName());
return Response.success(dataStreamService.listAllWithGroupId(request));
}
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java
index 5e64559..cba9cf8 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/StorageController.java
@@ -25,8 +25,9 @@ import io.swagger.annotations.ApiOperation;
import java.util.List;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListVO;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageListResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
import org.apache.inlong.manager.common.pojo.query.ConnectionInfo;
@@ -60,7 +61,7 @@ public class StorageController {
@RequestMapping(value = "/save", method = RequestMethod.POST)
@OperationLog(operation = OperationType.CREATE)
@ApiOperation(value = "Save storage information")
- public Response<Integer> save(@RequestBody BaseStorageInfo storageInfo) {
+ public Response<Integer> save(@RequestBody BaseStorageRequest storageInfo) {
return Response.success(storageService.save(storageInfo, LoginUserUtil.getLoginUserDetail().getUserName()));
}
@@ -70,20 +71,20 @@ public class StorageController {
@ApiImplicitParam(name = "storageType", dataTypeClass = String.class, required = true),
@ApiImplicitParam(name = "id", dataTypeClass = Integer.class, required = true)
})
- public Response<BaseStorageInfo> get(@RequestParam String storageType, @PathVariable Integer id) {
+ public Response<BaseStorageResponse> get(@RequestParam String storageType, @PathVariable Integer id) {
return Response.success(storageService.getById(storageType, id));
}
@RequestMapping(value = "/list", method = RequestMethod.GET)
@ApiOperation(value = "Query data storage list based on conditions")
- public Response<PageInfo<? extends BaseStorageListVO>> listByCondition(StoragePageRequest request) {
+ public Response<PageInfo<? extends BaseStorageListResponse>> listByCondition(StoragePageRequest request) {
return Response.success(storageService.listByCondition(request));
}
@RequestMapping(value = "/update", method = RequestMethod.POST)
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Modify data storage information")
- public Response<Boolean> update(@RequestBody BaseStorageInfo storageInfo) {
+ public Response<Boolean> update(@RequestBody BaseStorageRequest storageInfo) {
return Response.success(storageService.update(storageInfo, LoginUserUtil.getLoginUserDetail().getUserName()));
}
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 5e56f81..35f8e8d 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -16,9 +16,11 @@
# specific language governing permissions and limitations
# under the License.
#
+
# Log level
logging.level.root=INFO
logging.level.org.apache.inlong.manager=debug
+
spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
spring.datasource.druid.username=root
spring.datasource.druid.password=inlong
@@ -42,26 +44,32 @@ spring.datasource.druid.testOnReturn=false
spring.datasource.druid.filters=stat,wall
# Open the mergeSql function through the connectProperties property, Slow SQL records
spring.datasource.druid.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
# Manager address of Tube cluster, used to create Topic
cluster.tube.manager=http://127.0.0.1:8081
# Master address, used to manage Tube broker
cluster.tube.master=127.0.0.1:8000,127.0.0.1:8010
# Tube cluster ID
cluster.tube.clusterId=1
+
# Push configuration to the path on ZooKeeper
cluster.zk.url=127.0.0.1:2181
cluster.zk.root=inlong_hive
+
# Application name in Sort
sort.appName=inlong_app
+
# Pulsar admin URL
pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
# Pulsar broker address
pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
# Default tenant of Pulsar
pulsar.defaultTenant=public
+
# Audit configuration
# Audit query source that decide what data source to query, currently only supports [MYSQL|ELASTICSEARCH]
audit.query.source=MYSQL
+
# Elasticsearch config
# Elasticsearch host split by coma if more than one host, such as 'host1,host2'
es.index.search.hostname=127.0.0.1
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
index 8936052..5bf9e44 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
@@ -17,9 +17,15 @@
package org.apache.inlong.manager.service.core;
+import java.util.Arrays;
+import java.util.List;
import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.pojo.business.BusinessExtInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessPulsarInfo;
+import org.apache.inlong.manager.dao.entity.BusinessExtEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessExtEntityMapper;
import org.apache.inlong.manager.web.ServiceBaseTest;
import org.junit.Assert;
import org.junit.Test;
@@ -27,19 +33,36 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.TestComponent;
/**
- * Business Service Test
+ * Business service test
*/
@TestComponent
public class BusinessServiceTest extends ServiceBaseTest {
+ private final String globalGroupId = "b_group1";
+ private final String globalGroupName = "group1";
+ private final String globalOperator = "test_user";
+
+ @Autowired
+ BusinessExtEntityMapper businessExtMapper;
@Autowired
private BusinessService businessService;
public String saveBusiness(String groupName, String operator) {
- BusinessInfo businessInfo = new BusinessInfo();
+ BusinessInfo businessInfo;
+ try {
+ businessInfo = businessService.get(globalGroupId);
+ if (businessInfo != null) {
+ return businessInfo.getInlongGroupId();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+
+ businessInfo = new BusinessInfo();
businessInfo.setName(groupName);
businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
businessInfo.setCreator(operator);
+ businessInfo.setStatus(EntityStatus.BIZ_CONFIG_SUCCESSFUL.getCode());
BusinessPulsarInfo pulsarInfo = new BusinessPulsarInfo();
pulsarInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
@@ -53,20 +76,43 @@ public class BusinessServiceTest extends ServiceBaseTest {
}
@Test
- public void testSave() {
- String groupName = "test_group1";
- String operator = "test_user";
- String groupId = this.saveBusiness(groupName, operator);
+ public void testSaveAndDelete() {
+ String groupId = this.saveBusiness(globalGroupName, globalOperator);
Assert.assertNotNull(groupId);
+
+ boolean result = businessService.delete(groupId, globalOperator);
+ Assert.assertTrue(result);
}
@Test
- public void testDelete() {
- String groupName = "test_group2";
- String operator = "test_user";
- String groupId = this.saveBusiness(groupName, operator);
- boolean result = businessService.delete(groupId, operator);
- Assert.assertTrue(result);
+ public void testSaveAndUpdateExt() {
+ // check insert
+ BusinessExtInfo businessExtInfo1 = new BusinessExtInfo();
+ businessExtInfo1.setId(1);
+ businessExtInfo1.setInlongGroupId(globalGroupId);
+ businessExtInfo1.setKeyName("pulsar_url");
+ businessExtInfo1.setKeyValue("http://127.0.0.1:8080");
+
+ BusinessExtInfo businessExtInfo2 = new BusinessExtInfo();
+ businessExtInfo2.setId(2);
+ businessExtInfo2.setInlongGroupId(globalGroupId);
+ businessExtInfo2.setKeyName("pulsar_secret");
+ businessExtInfo2.setKeyValue("QWEASDZXC");
+
+ List<BusinessExtInfo> businessExtInfoList = Arrays.asList(businessExtInfo1, businessExtInfo2);
+ businessService.saveOrUpdateExt(globalGroupId, businessExtInfoList);
+
+ List<BusinessExtEntity> extEntityList = businessExtMapper.selectByGroupId(globalGroupId);
+ Assert.assertEquals(2, extEntityList.size());
+ Assert.assertEquals("pulsar_url", extEntityList.get(0).getKeyName());
+ Assert.assertEquals("http://127.0.0.1:8080", extEntityList.get(0).getKeyValue());
+
+ // check update
+ businessExtInfo1.setKeyValue("http://127.0.0.1:8081");
+ businessService.saveOrUpdateExt(globalGroupId, businessExtInfoList);
+ extEntityList = businessExtMapper.selectByGroupId(globalGroupId);
+ Assert.assertEquals(2, extEntityList.size());
+ Assert.assertEquals("http://127.0.0.1:8081", extEntityList.get(0).getKeyValue());
}
}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
index 8b74aef..718d138 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
@@ -26,7 +26,7 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
/**
- * Consumption Service Test
+ * Consumption service test
*/
public class ConsumptionServiceTest extends ServiceBaseTest {
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStorageServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStorageServiceTest.java
new file mode 100644
index 0000000..406c1f3
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStorageServiceTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.datastorage.BaseStorageResponse;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveRequest;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveResponse;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.web.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Data storage service test
+ */
+public class DataStorageServiceTest extends ServiceBaseTest {
+
+ private final String globalGroupId = "b_group1";
+ private final String globalStreamId = "stream1";
+ private final String globalOperator = "test_user";
+
+ @Autowired
+ private StorageService storageService;
+ @Autowired
+ private BusinessServiceTest businessServiceTest;
+ @Autowired
+ private DataStreamServiceTest streamServiceTest;
+
+ public Integer saveStorage() {
+ streamServiceTest.saveDataStream(globalGroupId, globalStreamId, globalOperator);
+
+ StorageHiveRequest storageInfo = new StorageHiveRequest();
+ storageInfo.setInlongGroupId(globalGroupId);
+ storageInfo.setInlongStreamId(globalStreamId);
+ storageInfo.setStorageType(BizConstant.STORAGE_HIVE);
+ storageInfo.setEnableCreateTable(BizConstant.DISABLE_CREATE_TABLE);
+
+ return storageService.save(storageInfo, globalOperator);
+ }
+
+ @Test
+ public void testSaveAndDelete() {
+ Integer id = this.saveStorage();
+ Assert.assertNotNull(id);
+
+ boolean result = storageService.delete(BizConstant.STORAGE_HIVE, id, globalOperator);
+ Assert.assertTrue(result);
+ }
+
+ @Test
+ public void testListByIdentifier() {
+ Integer id = this.saveStorage();
+
+ BaseStorageResponse storage = storageService.getById(BizConstant.STORAGE_HIVE, id);
+ Assert.assertEquals(globalGroupId, storage.getInlongGroupId());
+
+ storageService.delete(BizConstant.STORAGE_HIVE, id, globalOperator);
+ }
+
+ @Test
+ public void testGetAndUpdate() {
+ Integer id = this.saveStorage();
+ BaseStorageResponse storage = storageService.getById(BizConstant.STORAGE_HIVE, id);
+ Assert.assertEquals(globalGroupId, storage.getInlongGroupId());
+
+ StorageHiveResponse hiveResponse = (StorageHiveResponse) storage;
+ hiveResponse.setEnableCreateTable(BizConstant.DISABLE_CREATE_TABLE);
+
+ StorageHiveRequest request = CommonBeanUtils.copyProperties(hiveResponse, StorageHiveRequest::new);
+ boolean result = storageService.update(request, globalOperator);
+ Assert.assertTrue(result);
+ }
+
+}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStreamServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStreamServiceTest.java
new file mode 100644
index 0000000..a22c974
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/DataStreamServiceTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
+import org.apache.inlong.manager.web.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.TestComponent;
+
+/**
+ * Data stream service test
+ */
+@TestComponent
+public class DataStreamServiceTest extends ServiceBaseTest {
+
+ private final String globalGroupId = "b_group1";
+ private final String globalGroupName = "group1";
+ private final String globalStreamId = "stream1";
+ private final String globalOperator = "test_user";
+
+ @Autowired
+ private DataStreamService dataStreamService;
+ @Autowired
+ private BusinessServiceTest businessServiceTest;
+
+ public Integer saveDataStream(String groupId, String streamId, String operator) {
+ DataStreamInfo streamInfo;
+ try {
+ streamInfo = dataStreamService.get(groupId, streamId);
+ if (streamInfo != null) {
+ return streamInfo.getId();
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+
+ businessServiceTest.saveBusiness(globalGroupName, operator);
+
+ streamInfo = new DataStreamInfo();
+ streamInfo.setInlongGroupId(groupId);
+ streamInfo.setInlongStreamId(streamId);
+ streamInfo.setDataEncoding("UTF-8");
+
+ return dataStreamService.save(streamInfo, operator);
+ }
+
+ @Test
+ public void testSaveAndDelete() {
+ Integer id = this.saveDataStream(globalGroupId, globalStreamId, globalOperator);
+ Assert.assertNotNull(id);
+
+ boolean result = dataStreamService.delete(globalGroupId, globalStreamId, globalOperator);
+ Assert.assertTrue(result);
+ }
+
+}
diff --git a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
index 7bdcb27..1184ddd 100644
--- a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
@@ -570,6 +570,7 @@ CREATE TABLE `storage_hive`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
+ `enable_create_table` tinyint(1) DEFAULT 1 COMMENT 'Whether to enable create table, 1: enable, 0: disable, default is 1',
`jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
`username` varchar(128) DEFAULT NULL COMMENT 'Username',
`password` varchar(255) DEFAULT NULL COMMENT 'User password',