You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/11/28 09:56:11 UTC
[incubator-inlong] branch master updated: [INLONG-1849][Feature][InLong-Manager] Push Pulsar info for Sort (#1850)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 74c4c82 [INLONG-1849][Feature][InLong-Manager] Push Pulsar info for Sort (#1850)
74c4c82 is described below
commit 74c4c822881b858361b770b2c22d88744cee89a8
Author: healchow <he...@gmail.com>
AuthorDate: Sun Nov 28 17:56:05 2021 +0800
[INLONG-1849][Feature][InLong-Manager] Push Pulsar info for Sort (#1850)
Co-authored-by: healchow <he...@gmail.com>
---
.../docker-compose/sql/apache_inlong_manager.sql | 99 +++---
inlong-manager/doc/sql/apache_inlong_manager.sql | 99 +++---
.../inlong/manager/common/enums/BizConstant.java | 8 +
.../manager/common/pojo/business/BusinessInfo.java | 3 +-
.../common/pojo/business/BusinessMqExtBase.java | 2 +-
.../common/pojo/business/BusinessPulsarInfo.java | 6 +-
.../common/pojo/consumption/ConsumptionInfo.java | 8 +-
.../common/pojo/datastorage/StorageHiveInfo.java | 50 +--
.../StorageHiveSortInfo.java} | 90 ++----
.../pojo/datastorage/StoragePageRequest.java | 3 +
.../common/pojo/datastream/DataStreamInfo.java | 23 +-
.../pojo/datastream/DataStreamPageRequest.java | 3 +
.../manager/dao/entity/DataStreamEntity.java | 5 +-
.../manager/dao/entity/StorageHiveEntity.java | 18 +-
.../manager/dao/mapper/DataStreamEntityMapper.java | 20 +-
.../dao/mapper/StorageHiveEntityMapper.java | 14 +-
.../resources/mappers/DataStreamEntityMapper.xml | 351 ++++++++-------------
.../resources/mappers/StorageHiveEntityMapper.xml | 285 ++++++++++-------
.../test/resources/sql/apache_inlong_manager.sql | 195 +++++++-----
.../manager/service/core/DataStreamService.java | 18 --
.../service/core/impl/BusinessServiceImpl.java | 36 ---
.../service/core/impl/DataStreamServiceImpl.java | 33 +-
.../service/core/impl/StorageHiveOperation.java | 8 +-
.../service/core/impl/StorageServiceImpl.java | 2 +-
.../core/impl/WorkflowApproverServiceImpl.java | 4 +-
.../hive/CreateHiveTableForAllStreamListener.java | 11 +-
.../hive/CreateHiveTableForOneStreamListener.java | 16 +-
.../service/thirdpart/hive/HiveTableOperator.java | 12 +-
.../thirdpart/sort/PushHiveConfigTaskListener.java | 271 +++++++++++-----
.../thirdpart/sort/SortFieldFormatUtils.java | 26 +-
.../ConsumptionCompleteProcessListener.java | 8 +-
.../test/resources/sql/apache_inlong_manager.sql | 182 ++++++-----
.../core/processor/StartEventProcessor.java | 2 +-
.../workflow/core/processor/UserTaskProcessor.java | 4 +-
.../workflow/util/WorkflowFormParserUtils.java | 6 +-
35 files changed, 1023 insertions(+), 898 deletions(-)
diff --git a/docker/docker-compose/sql/apache_inlong_manager.sql b/docker/docker-compose/sql/apache_inlong_manager.sql
index dec268e..8cae257 100644
--- a/docker/docker-compose/sql/apache_inlong_manager.sql
+++ b/docker/docker-compose/sql/apache_inlong_manager.sql
@@ -93,7 +93,7 @@ CREATE TABLE `business`
`schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'Business status',
+ `status` int(4) DEFAULT '21' COMMENT 'Business status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -161,7 +161,7 @@ CREATE TABLE `cluster_info`
`url` varchar(256) DEFAULT NULL COMMENT 'Cluster URL address',
`is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
`ext_props` json DEFAULT NULL COMMENT 'extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -195,7 +195,7 @@ CREATE TABLE `common_db_server`
`db_description` varchar(256) DEFAULT NULL COMMENT 'DB description',
`backup_db_server_ip` varchar(64) DEFAULT NULL COMMENT 'Backup DB HOST',
`backup_db_port` int(11) DEFAULT NULL COMMENT 'Backup DB port',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -221,7 +221,7 @@ CREATE TABLE `common_file_server`
`issue_type` varchar(128) DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
`username` varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -248,12 +248,12 @@ CREATE TABLE `consumption`
`topic` varchar(255) NOT NULL COMMENT 'Consumption topic',
`filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
`inlong_stream_id` varchar(1024) DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
- `status` int(11) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `status` int(4) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'creator',
`modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
- `is_deleted` int(2) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
@@ -264,8 +264,8 @@ CREATE TABLE `consumption`
DROP TABLE IF EXISTS `consumption_pulsar`;
CREATE TABLE `consumption_pulsar`
(
- `id` int NOT NULL AUTO_INCREMENT,
- `consumption_id` int DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `consumption_id` int(11) DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
`consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
`consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
`inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group ID',
@@ -273,7 +273,7 @@ CREATE TABLE `consumption_pulsar`
`retry_letter_topic` varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
`is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
`dead_letter_topic` varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
- `is_deleted` int DEFAULT '0' COMMENT 'Whether to delete',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
@@ -294,7 +294,7 @@ CREATE TABLE `data_proxy_cluster`
`net_type` varchar(20) DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
`ext_props` json DEFAULT NULL COMMENT 'Extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'Cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'Cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -360,11 +360,16 @@ CREATE TABLE `data_stream`
`storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
`data_type` varchar(20) DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
- `file_delimiter` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
`have_predefined_fields` tinyint(1) DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(11) DEFAULT '0' COMMENT 'Data stream status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data stream status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -438,7 +443,7 @@ CREATE TABLE `operation_log`
`cost_time` bigint(20) DEFAULT NULL COMMENT 'time-consuming',
`body` text COMMENT 'Request body',
`param` text COMMENT 'parameter',
- `status` tinyint(1) DEFAULT NULL COMMENT 'status',
+ `status` int(4) DEFAULT NULL COMMENT 'status',
`request_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
`err_msg` text COMMENT 'Error message',
PRIMARY KEY (`id`)
@@ -502,8 +507,8 @@ CREATE TABLE `source_db_detail`
`table_fields` longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
`data_sql` longtext COMMENT 'SQL statement to collect source data, required for full amount',
`crontab` varchar(56) DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -558,8 +563,8 @@ CREATE TABLE `source_file_detail`
`username` varchar(32) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
`file_path` varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -592,36 +597,36 @@ CREATE TABLE `storage_ext`
-- Table structure for storage_hive
-- ----------------------------
DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
CREATE TABLE `storage_hive`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
- `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
- `jdbc_url` varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
- `username` varchar(128) NOT NULL COMMENT 'Username',
- `password` varchar(255) NOT NULL COMMENT 'User password',
- `db_name` varchar(128) NOT NULL COMMENT 'Target database name',
- `table_name` varchar(128) NOT NULL COMMENT 'Target data table name',
- `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
- `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
- `partition_type` varchar(10) DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
- `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
- `encoding_type` varchar(255) DEFAULT NULL COMMENT 'data encoding',
- `field_splitter` varchar(10) DEFAULT NULL COMMENT 'field separator',
- `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
- `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
- `usage_interval` varchar(10) DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
- `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
- `status` int(11) DEFAULT '0' COMMENT 'status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
- `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
+ `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
+ `jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+ `username` varchar(128) DEFAULT NULL COMMENT 'Username',
+ `password` varchar(255) DEFAULT NULL COMMENT 'User password',
+ `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
+ `table_name` varchar(128) DEFAULT NULL COMMENT 'Target data table name',
+ `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+ `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+ `partition_interval` int(5) DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+ `partition_unit` varchar(10) DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+ `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
+ `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
+ `partition_creation_strategy` varchar(50) DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+ `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+ `data_encoding` varchar(20) DEFAULT 'UTF-8' COMMENT 'data encoding type',
+ `data_separator` varchar(10) DEFAULT NULL COMMENT 'data field separator',
+ `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
+ `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data is stored in Hive configuration table';
@@ -891,7 +896,7 @@ CREATE TABLE `cluster_set`
`middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
`in_charges` varchar(512) COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'ClusterSet status',
+ `status` int(4) DEFAULT '21' COMMENT 'ClusterSet status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) NULL COMMENT 'Modifier name',
diff --git a/inlong-manager/doc/sql/apache_inlong_manager.sql b/inlong-manager/doc/sql/apache_inlong_manager.sql
index dec268e..8cae257 100644
--- a/inlong-manager/doc/sql/apache_inlong_manager.sql
+++ b/inlong-manager/doc/sql/apache_inlong_manager.sql
@@ -93,7 +93,7 @@ CREATE TABLE `business`
`schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'Business status',
+ `status` int(4) DEFAULT '21' COMMENT 'Business status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -161,7 +161,7 @@ CREATE TABLE `cluster_info`
`url` varchar(256) DEFAULT NULL COMMENT 'Cluster URL address',
`is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
`ext_props` json DEFAULT NULL COMMENT 'extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -195,7 +195,7 @@ CREATE TABLE `common_db_server`
`db_description` varchar(256) DEFAULT NULL COMMENT 'DB description',
`backup_db_server_ip` varchar(64) DEFAULT NULL COMMENT 'Backup DB HOST',
`backup_db_port` int(11) DEFAULT NULL COMMENT 'Backup DB port',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -221,7 +221,7 @@ CREATE TABLE `common_file_server`
`issue_type` varchar(128) DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
`username` varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -248,12 +248,12 @@ CREATE TABLE `consumption`
`topic` varchar(255) NOT NULL COMMENT 'Consumption topic',
`filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
`inlong_stream_id` varchar(1024) DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
- `status` int(11) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `status` int(4) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'creator',
`modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
- `is_deleted` int(2) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
@@ -264,8 +264,8 @@ CREATE TABLE `consumption`
DROP TABLE IF EXISTS `consumption_pulsar`;
CREATE TABLE `consumption_pulsar`
(
- `id` int NOT NULL AUTO_INCREMENT,
- `consumption_id` int DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `consumption_id` int(11) DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
`consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
`consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
`inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group ID',
@@ -273,7 +273,7 @@ CREATE TABLE `consumption_pulsar`
`retry_letter_topic` varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
`is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
`dead_letter_topic` varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
- `is_deleted` int DEFAULT '0' COMMENT 'Whether to delete',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
@@ -294,7 +294,7 @@ CREATE TABLE `data_proxy_cluster`
`net_type` varchar(20) DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
`ext_props` json DEFAULT NULL COMMENT 'Extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'Cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'Cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -360,11 +360,16 @@ CREATE TABLE `data_stream`
`storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
`data_type` varchar(20) DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
- `file_delimiter` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
`have_predefined_fields` tinyint(1) DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(11) DEFAULT '0' COMMENT 'Data stream status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data stream status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -438,7 +443,7 @@ CREATE TABLE `operation_log`
`cost_time` bigint(20) DEFAULT NULL COMMENT 'time-consuming',
`body` text COMMENT 'Request body',
`param` text COMMENT 'parameter',
- `status` tinyint(1) DEFAULT NULL COMMENT 'status',
+ `status` int(4) DEFAULT NULL COMMENT 'status',
`request_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
`err_msg` text COMMENT 'Error message',
PRIMARY KEY (`id`)
@@ -502,8 +507,8 @@ CREATE TABLE `source_db_detail`
`table_fields` longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
`data_sql` longtext COMMENT 'SQL statement to collect source data, required for full amount',
`crontab` varchar(56) DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -558,8 +563,8 @@ CREATE TABLE `source_file_detail`
`username` varchar(32) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
`file_path` varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -592,36 +597,36 @@ CREATE TABLE `storage_ext`
-- Table structure for storage_hive
-- ----------------------------
DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
CREATE TABLE `storage_hive`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
- `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
- `jdbc_url` varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
- `username` varchar(128) NOT NULL COMMENT 'Username',
- `password` varchar(255) NOT NULL COMMENT 'User password',
- `db_name` varchar(128) NOT NULL COMMENT 'Target database name',
- `table_name` varchar(128) NOT NULL COMMENT 'Target data table name',
- `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
- `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
- `partition_type` varchar(10) DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
- `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
- `encoding_type` varchar(255) DEFAULT NULL COMMENT 'data encoding',
- `field_splitter` varchar(10) DEFAULT NULL COMMENT 'field separator',
- `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
- `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
- `usage_interval` varchar(10) DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
- `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
- `status` int(11) DEFAULT '0' COMMENT 'status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
- `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
+ `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
+ `jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+ `username` varchar(128) DEFAULT NULL COMMENT 'Username',
+ `password` varchar(255) DEFAULT NULL COMMENT 'User password',
+ `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
+ `table_name` varchar(128) DEFAULT NULL COMMENT 'Target data table name',
+ `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+ `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+ `partition_interval` int(5) DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+ `partition_unit` varchar(10) DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+ `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
+ `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
+ `partition_creation_strategy` varchar(50) DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+ `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+ `data_encoding` varchar(20) DEFAULT 'UTF-8' COMMENT 'data encoding type',
+ `data_separator` varchar(10) DEFAULT NULL COMMENT 'data field separator',
+ `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
+ `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data is stored in Hive configuration table';
@@ -891,7 +896,7 @@ CREATE TABLE `cluster_set`
`middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
`in_charges` varchar(512) COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'ClusterSet status',
+ `status` int(4) DEFAULT '21' COMMENT 'ClusterSet status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) NULL COMMENT 'Modifier name',
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index c2c322d..d2eef5e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -34,6 +34,14 @@ public class BizConstant {
public static final String DATA_TYPE_KEY_VALUE = "KEY-VALUE";
+ public static final String FILE_FORMAT_TEXT = "TextFile";
+
+ public static final String FILE_FORMAT_ORC = "OrcFile";
+
+ public static final String FILE_FORMAT_SEQUENCE = "SequenceFile";
+
+ public static final String FILE_FORMAT_PARQUET = "Parquet";
+
public static final String MIDDLEWARE_TUBE = "TUBE";
public static final String MIDDLEWARE_PULSAR = "PULSAR";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
index eb2906b..b3d8643 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
@@ -55,7 +55,8 @@ public class BusinessInfo {
@ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
private String middlewareType;
- @ApiModelProperty(value = "MQ resource object, in business, Tube corresponds to Topic")
+ @ApiModelProperty(value = "MQ resource object, in business",
+ notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
private String mqResourceObj;
@ApiModelProperty(value = "Tube master URL")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java
index 6f365e6..29fc201 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessMqExtBase.java
@@ -45,7 +45,7 @@ public class BusinessMqExtBase {
@ApiModelProperty(value = "is deleted? 0: deleted, 1: not deleted")
private Integer isDeleted = 0;
- @ApiModelProperty(value = "Middleware type of data storage, high throughput: TUBE")
+ @ApiModelProperty(value = "Middleware type of data storage, high throughput: TUBE, high consistency : PULSAR")
private String middlewareType;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java
index 8b15b3e..0851505 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessPulsarInfo.java
@@ -49,18 +49,18 @@ public class BusinessPulsarInfo extends BusinessMqExtBase {
private Integer retentionTime = 72;
@ApiModelProperty(value = "The unit of the message storage time")
- private String retentionTimeUnit;
+ private String retentionTimeUnit = "hours";
@ApiModelProperty(value = "Message time-to-live duration")
private Integer ttl = 24;
@ApiModelProperty(value = "The unit of message's time-to-live duration")
- private String ttlUnit;
+ private String ttlUnit = "hours";
@ApiModelProperty(value = "Message size")
private Integer retentionSize = -1;
@ApiModelProperty(value = "The unit of message size")
- private String retentionSizeUnit;
+ private String retentionSizeUnit = "MB";
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
index cebdcfb..d20d4c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
@@ -44,15 +44,15 @@ public class ConsumptionInfo {
private Integer id;
@ApiModelProperty(value = "consumer group id")
- @NotBlank(message = "consumerGroupId can't be null")
+ @NotBlank(message = "consumerGroupId cannot be null")
private String consumerGroupId;
@ApiModelProperty(value = "consumer group name: only support [a-zA-Z0-9_]")
- @NotBlank(message = "consumerGroupName can't be null")
+ @NotBlank(message = "consumerGroupName cannot be null")
private String consumerGroupName;
@ApiModelProperty(value = "consumption in charge")
- @NotNull(message = "inCharges can't be null")
+ @NotNull(message = "inCharges cannot be null")
private String inCharges;
@ApiModelProperty(value = "consumption target business group id")
@@ -60,7 +60,7 @@ public class ConsumptionInfo {
private String inlongGroupId;
@ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
- @NotBlank(message = "middlewareType can't be null")
+ // @NotBlank(message = "middlewareType cannot be null")
private String middlewareType;
@ApiModelProperty(value = "consumption target topic")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
index 589a262..0c7b3c1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveInfo.java
@@ -39,46 +39,52 @@ public class StorageHiveInfo extends BaseStorageInfo {
@ApiModelProperty("Hive JDBC URL")
private String jdbcUrl;
- @ApiModelProperty("username for JDBC URL")
+ @ApiModelProperty("Username for JDBC URL")
private String username;
- @ApiModelProperty("user password")
+ @ApiModelProperty("User password")
private String password;
- @ApiModelProperty("target database name")
+ @ApiModelProperty("Target database name")
private String dbName;
- @ApiModelProperty("target table name")
+ @ApiModelProperty("Target table name")
private String tableName;
- @ApiModelProperty("primary partition field")
+ @ApiModelProperty("HDFS defaultFS")
+ private String hdfsDefaultFs;
+
+ @ApiModelProperty("Warehouse directory")
+ private String warehouseDir;
+
+ @ApiModelProperty("Partition interval, support: 1 H, 1 D, 30 I, 10 I")
+ private Integer partitionInterval;
+
+ @ApiModelProperty("Partition type, support: D-day, H-hour, I-minute")
+ private String partitionUnit;
+
+ @ApiModelProperty("Primary partition field")
private String primaryPartition;
- @ApiModelProperty("secondary partition field")
+ @ApiModelProperty("Secondary partition field")
private String secondaryPartition;
- @ApiModelProperty("partition type, like: H-hour, D-day, W-week, M-month, O-once, R-regulation")
- private String partitionType;
+ @ApiModelProperty("Partition creation strategy, partition start, partition close")
+ private String partitionCreationStrategy;
- @ApiModelProperty("file format, support: TextFile, RCFile, SequenceFile, Avro")
+ @ApiModelProperty("File format, support: TextFile, RCFile, SequenceFile, Avro")
private String fileFormat;
- @ApiModelProperty("field splitter")
- private String fieldSplitter;
-
- @ApiModelProperty("data encoding type")
- private String encodingType;
-
- @ApiModelProperty("HDFS defaultFS")
- private String hdfsDefaultFs;
+ @ApiModelProperty("Data encoding type")
+ private String dataEncoding;
- @ApiModelProperty("warehouse directory")
- private String warehouseDir;
+ @ApiModelProperty("Data field separator")
+ private String dataSeparator;
- @ApiModelProperty("interval at which Sort collects data to Hive is 10M, 15M, 30M, 1H, 1D")
- private String usageInterval;
+ @ApiModelProperty("Data storage period in Hive, unit: Day")
+ private Integer storagePeriod;
- @ApiModelProperty("backend operation log")
+ @ApiModelProperty("Backend operation log")
private String optLog;
@ApiModelProperty("hive table field list")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfoToHiveConfig.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
similarity index 56%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfoToHiveConfig.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
index a35ab0b..bf52872 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfoToHiveConfig.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StorageHiveSortInfo.java
@@ -15,91 +15,49 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.datastream;
+package org.apache.inlong.manager.common.pojo.datastorage;
-import java.util.List;
import lombok.Data;
/**
- * Data stream info for Hive config
+ * Hive info for Sort config
*/
@Data
-public class DataStreamInfoToHiveConfig {
+public class StorageHiveSortInfo {
private Integer id;
-
- private String jdbcUrl;
-
- private int status;
-
- private String inlongStreamId;
-
- private String description;
-
private String inlongGroupId;
+ private String inlongStreamId;
- private String defaultSelectors;
-
- private String partitionType;
-
- private String partitionFields;
-
- private String partitionFieldPosition;
-
- private Integer clusterId;
-
- private String dataType;
-
- private String fieldSplitter;
-
- private String clusterTag;
-
- private String encodingType;
-
- private String hiveAddr;
-
- private String creator;
-
- private String userName;
-
+ // Hive server info
+ private String jdbcUrl;
+ private String username;
private String password;
- private String compression;
-
- private String warehouseDir;
-
- private boolean setHadoopUgi;
-
- private String hadoopUgi;
-
- private Integer storagePeriod;
-
- private String fsDefaultName;
-
+ // Hive db and table info
private String dbName;
-
private String tableName;
+ private String hdfsDefaultFs;
+ private String warehouseDir;
+ private Integer partitionInterval;
+ private String partitionUnit;
private String primaryPartition;
-
private String secondaryPartition;
+ private String partitionCreationStrategy;
private String fileFormat;
+ private String dataEncoding;
+ private String targetSeparator; // Target separator configured in the storage info
+ private Integer status;
+ private String creator;
- private String p;
-
- private String mt;
-
- private String usTaskId;
-
- private String qualifiedThreshold;
-
- private Integer hiveType;
-
- private String usageInterval;
-
- private String partitionStrategy;
-
- private List<DataStreamExtInfo> extList;
+ // Data stream info
+ private String mqResourceObj;
+ private String dataSourceType;
+ private String dataType;
+ private String description;
+ private String sourceSeparator; // Target separator configured in the stream info
+ private String dataEscapeChar;
}
\ No newline at end of file
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java
index f966fa4..083eb83 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastorage/StoragePageRequest.java
@@ -40,6 +40,9 @@ public class StoragePageRequest extends PageRequest {
@ApiModelProperty(value = "Storage type, such as HIVE", required = true)
private String storageType;
+ @ApiModelProperty(value = "Key word")
+ private String keyWord;
+
@ApiModelProperty(value = "Status")
private Integer status;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java
index 178eaf2..9e60f4d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamInfo.java
@@ -42,6 +42,10 @@ public class DataStreamInfo extends DataStreamBaseInfo {
@ApiModelProperty(value = "Data stream description")
private String description;
+ @ApiModelProperty(value = "MQ resource object, in business",
+ notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
+ private String mqResourceObj;
+
@ApiModelProperty(value = "Data source type, including: FILE, DB, AUTO_PUSH (DATA_PROXY_SDK, HTTP)")
private String dataSourceType;
@@ -54,12 +58,27 @@ public class DataStreamInfo extends DataStreamBaseInfo {
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK (required when dataSourceType=FILE, AUTO_PUSH)")
private String dataEncoding;
- @ApiModelProperty(value = "Field delimiter, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
- private String fileDelimiter;
+ @ApiModelProperty(value = "Data separator, stored as ASCII code (required when dataSourceType=FILE, AUTO_PUSH)")
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol, stored as ASCII code")
+ private String dataEscapeChar;
@ApiModelProperty(value = "(File and DB access) Whether there are predefined fields, 0: no, 1: yes")
private Integer havePredefinedFields;
+ @ApiModelProperty(value = "Number of access items per day, unit: 10,000 items per day")
+ private Integer dailyRecords;
+
+ @ApiModelProperty(value = "Access size per day, unit: GB per day")
+ private Integer dailyStorage;
+
+ @ApiModelProperty(value = "peak access per second, unit: bars per second")
+ private Integer peakRecords;
+
+ @ApiModelProperty(value = "The maximum length of a single piece of data, unit: Byte")
+ private Integer maxLength;
+
@ApiModelProperty(value = "Names of responsible persons, separated by commas")
private String inCharges;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java
index 8b08968..cd20094 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/datastream/DataStreamPageRequest.java
@@ -52,4 +52,7 @@ public class DataStreamPageRequest extends PageRequest {
@ApiModelProperty(value = "Current user", hidden = true)
private String currentUser;
+ @ApiModelProperty(value = "Business in charges", hidden = true)
+ private String inCharges;
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
index 15cd115..c825f9a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
@@ -35,14 +35,15 @@ public class DataStreamEntity implements Serializable {
private Integer storagePeriod;
private String dataType;
private String dataEncoding;
- private String fileDelimiter;
+ private String dataSeparator;
+ private String dataEscapeChar;
private Integer havePredefinedFields;
- private String inCharges;
private Integer dailyRecords;
private Integer dailyStorage;
private Integer peakRecords;
private Integer maxLength;
+ private String inCharges;
private Integer status;
private Integer previousStatus;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
index 16f05fd..c6e513e 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/StorageHiveEntity.java
@@ -33,16 +33,21 @@ public class StorageHiveEntity implements Serializable {
private String password;
private String dbName;
private String tableName;
+ private String hdfsDefaultFs;
+ private String warehouseDir;
+
+ private Integer partitionInterval;
+ private String partitionUnit;
private String primaryPartition;
private String secondaryPartition;
- private String partitionType;
+ private String partitionCreationStrategy;
+
private String fileFormat;
- private String fieldSplitter;
- private String encodingType;
- private String hdfsDefaultFs;
- private String warehouseDir;
- private String usageInterval;
+ private String dataEncoding;
+ private String dataSeparator;
private Integer storagePeriod;
+ private String optLog;
+
private Integer status;
private Integer previousStatus;
private Integer isDeleted;
@@ -50,7 +55,6 @@ public class StorageHiveEntity implements Serializable {
private String modifier;
private Date createTime;
private Date modifyTime;
- private String optLog;
private String tempView;
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
index c1c988d..41c97a7 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.dao.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
import org.apache.inlong.manager.dao.entity.DataStreamEntity;
@@ -36,8 +35,6 @@ public interface DataStreamEntityMapper {
DataStreamEntity selectByPrimaryKey(Integer id);
- int updateByPrimaryKeySelective(DataStreamEntity record);
-
int updateByPrimaryKey(DataStreamEntity record);
DataStreamEntity selectByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId);
@@ -50,25 +47,10 @@ public interface DataStreamEntityMapper {
* @param request query request
* @return data stream list
*/
- List<DataStreamEntity> selectByCondition(DataStreamPageRequest request);
+ List<DataStreamEntity> selectByCondition(@Param("request") DataStreamPageRequest request);
List<DataStreamEntity> selectByGroupId(@Param("groupId") String groupId);
- /**
- * According to the conditions and business in charges, query all data stream
- *
- * @param request paging query conditions
- * @param inCharges business in charges
- * @return data stream list
- */
- List<DataStreamEntity> selectByConditionAndInCharges(@Param("request") DataStreamPageRequest request,
- @Param("inCharges") String inCharges);
-
- List<DataStreamInfoToHiveConfig> selectStreamToHiveInfo(@Param("groupId") String groupId);
-
- DataStreamInfoToHiveConfig selectStreamToHiveInfoByIdentifier(@Param("groupId") String groupId,
- @Param("streamId") String streamId);
-
int selectCountByGroupId(@Param("groupId") String groupId);
List<DataStreamTopicVO> selectTopicList(@Param("groupId") String groupId);
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
index 74a14e1..fea4cd5 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/StorageHiveEntityMapper.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.dao.mapper;
import java.util.List;
import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
import org.apache.inlong.manager.common.pojo.datastorage.StoragePageRequest;
import org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo;
import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
@@ -45,7 +46,7 @@ public interface StorageHiveEntityMapper {
* @param request Paging query conditions
* @return Hive storage entity list
*/
- List<StorageHiveEntity> selectByCondition(StoragePageRequest request);
+ List<StorageHiveEntity> selectByCondition(@Param("request") StoragePageRequest request);
/**
* According to the business group id and data stream id, query valid storage information
@@ -80,7 +81,16 @@ public interface StorageHiveEntityMapper {
/**
* According to the business group id and data stream id, query Hive storage summary information
*/
- List<StorageSummaryInfo> selectSummaryByIdentifier(@Param("groupId") String groupId,
+ List<StorageSummaryInfo> selectSummary(@Param("groupId") String groupId, @Param("streamId") String streamId);
+
+ /**
+ * Select Hive configs for Sort under the business group id and stream id
+ *
+ * @param groupId Business group id
+ * @param streamId Data stream id, if is null, get all configs under the group id
+ * @return Hive Sort config
+ */
+ List<StorageHiveSortInfo> selectHiveSortInfoByIdentifier(@Param("groupId") String groupId,
@Param("streamId") String streamId);
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
index 16b9d7e..5e55122 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
@@ -18,8 +18,7 @@
under the License.
-->
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
- "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper">
<resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.DataStreamEntity">
<id column="id" jdbcType="INTEGER" property="id"/>
@@ -32,8 +31,13 @@
<result column="storage_period" jdbcType="INTEGER" property="storagePeriod"/>
<result column="data_type" jdbcType="VARCHAR" property="dataType"/>
<result column="data_encoding" jdbcType="VARCHAR" property="dataEncoding"/>
- <result column="file_delimiter" jdbcType="VARCHAR" property="fileDelimiter"/>
+ <result column="data_separator" jdbcType="VARCHAR" property="dataSeparator"/>
+ <result column="data_escape_char" jdbcType="VARCHAR" property="dataEscapeChar"/>
<result column="have_predefined_fields" jdbcType="INTEGER" property="havePredefinedFields"/>
+ <result column="daily_records" jdbcType="INTEGER" property="dailyRecords"/>
+ <result column="daily_storage" jdbcType="INTEGER" property="dailyStorage"/>
+ <result column="peak_records" jdbcType="INTEGER" property="peakRecords"/>
+ <result column="max_length" jdbcType="INTEGER" property="maxLength"/>
<result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
<result column="status" jdbcType="INTEGER" property="status"/>
<result column="previous_status" jdbcType="INTEGER" property="previousStatus"/>
@@ -44,47 +48,11 @@
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
<result column="temp_view" jdbcType="LONGVARCHAR" property="tempView"/>
</resultMap>
-
- <resultMap id="dataStreamFullInfo"
- type="org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig">
- <result column="id" jdbcType="VARCHAR" property="id"/>
- <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
- <result column="name" jdbcType="VARCHAR" property="inlongGroupId"/>
- <result column="status" jdbcType="VARCHAR" property="status"/>
- <result column="data_type" jdbcType="VARCHAR" property="dataType"/>
- <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
- <result column="field_splitter" jdbcType="VARCHAR" property="fieldSplitter"/>
- <result column="creator" jdbcType="VARCHAR" property="creator"/>
- <result column="db_name" jdbcType="VARCHAR" property="dbName"/>
- <result column="table_name" jdbcType="VARCHAR" property="tableName"/>
- <result column="partition_type" jdbcType="VARCHAR" property="partitionType"/>
- <result column="partition_field_position" jdbcType="VARCHAR"
- property="partitionFieldPosition"/>
- <result column="primary_partition" jdbcType="BIT" property="primaryPartition"/>
- <result column="secondary_partition" jdbcType="BIT" property="secondaryPartition"/>
- <result column="file_format" jdbcType="BIT" property="fileFormat"/>
- <result column="clusterid" jdbcType="BIT" property="clusterId"/>
- <result column="storage_period" jdbcType="VARCHAR" property="storagePeriod"/>
- <result column="cluster_tag" jdbcType="VARCHAR" property="clusterTag"/>
- <result column="username" jdbcType="TIMESTAMP" property="userName"/>
- <result column="PASSWORD" jdbcType="TIMESTAMP" property="password"/>
- <result column="warehouse_dir" jdbcType="TIMESTAMP" property="warehouseDir"/>
- <result column="hdfs_defaultfs" jdbcType="TIMESTAMP" property="fsDefaultName"/>
- <result column="hdfs_ugi" jdbcType="TIMESTAMP" property="hadoopUgi"/>
- <result column="encoding_type" jdbcType="VARCHAR" property="encodingType"/>
- <result column="us_task_id" jdbcType="VARCHAR" property="usTaskId"/>
- <result column="usage_interval" jdbcType="VARCHAR" property="usageInterval"/>
- </resultMap>
-
- <resultMap id="streamSummaryMap" type="java.util.Map">
- <result column="dataSourceType" property="dataSourceType" jdbcType="VARCHAR"/>
- <result column="dataStorageType" property="dataStorageType" jdbcType="VARCHAR"/>
- </resultMap>
-
<sql id="Base_Column_List">
id, inlong_stream_id, inlong_group_id, name, description, mq_resource_obj, data_source_type,
- storage_period, data_type, data_encoding, file_delimiter, have_predefined_fields, in_charges,
- status, previous_status, is_deleted, creator, modifier, create_time, modify_time, temp_view
+ storage_period, data_type, data_encoding, data_separator, data_escape_char, have_predefined_fields,
+ daily_records, daily_storage, peak_records, max_length, in_charges, status, previous_status,
+ is_deleted, creator, modifier, create_time, modify_time, temp_view
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -117,27 +85,38 @@
<select id="selectByCondition" resultMap="BaseResultMap"
parameterType="org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest">
select
- <include refid="Base_Column_List"/>
- from data_stream
+ distinct ds.id, ds.inlong_group_id, ds.inlong_stream_id, ds.name,
+ ds.description, ds.mq_resource_obj, ds.data_source_type, ds.storage_period, ds.data_type,
+ ds.data_encoding, ds.data_separator, ds.data_escape_char, ds.have_predefined_fields,
+ ds.daily_records, ds.daily_storage, ds.peak_records, ds.max_length, ds.in_charges,
+ ds.status, ds.creator, ds.modifier, ds.create_time, ds.modify_time
+ from data_stream ds, business biz, wf_approver approver
<where>
- is_deleted = 0
- and (creator = #{currentUser,jdbcType=VARCHAR} or
- find_in_set(#{currentUser,jdbcType=VARCHAR}, in_charges))
- <if test="inlongGroupId != null and inlongGroupId != ''">
- and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+ ds.is_deleted = 0
+ and approver.process_name = "NEW_BUSINESS_WORKFLOW" and approver.is_deleted = 0
+ and ds.inlong_group_id = biz.inlong_group_id and biz.is_deleted = 0
+ <if test="request.inlongGroupId != null and request.inlongGroupId != ''">
+ and ds.inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
+ </if>
+ and (
+ find_in_set(#{request.currentUser, jdbcType=VARCHAR}, biz.in_charges)
+ or find_in_set(#{request.currentUser, jdbcType=VARCHAR}, biz.followers)
+ or find_in_set(#{request.currentUser, jdbcType=VARCHAR}, approver.approvers)
+ )
+ <if test="request.dataSourceType != null and request.dataSourceType != ''">
+ and ds.data_source_type = #{request.dataSourceType, jdbcType=VARCHAR}
</if>
- <if test="dataSourceType != null and dataSourceType != ''">
- and data_source_type = #{dataSourceType, jdbcType=VARCHAR}
- </if>
- <if test="keyWord != null and keyWord != ''">
- and (name like CONCAT('%', #{keyWord}, '%') or description like CONCAT('%',
- #{keyWord}, '%'))
+ <if test="request.keyWord != null and request.keyWord != ''">
+ and (ds.inlong_stream_id like CONCAT('%', #{request.keyWord}, '%')
+ or ds.name like CONCAT('%', #{request.keyWord}, '%')
+ or ds.description like CONCAT('%', #{request.keyWord}, '%')
+ )
</if>
- <if test="status != null and status != ''">
- and status = #{status, jdbcType=INTEGER}
+ <if test="request.status != null and request.status != ''">
+ and ds.status = #{request.status, jdbcType=INTEGER}
</if>
</where>
- order by modify_time desc
+ order by ds.modify_time desc
</select>
<select id="selectByGroupId" resultType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
select
@@ -146,23 +125,6 @@
where inlong_group_id = #{groupId, jdbcType=VARCHAR}
and is_deleted = 0
</select>
- <select id="selectByConditionAndInCharges" resultType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
- select
- <include refid="Base_Column_List"/>
- from data_stream
- <where>
- inlong_group_id = #{request.inlongGroupId,jdbcType=VARCHAR}
- and is_deleted = 0
- and (creator = #{request.currentUser,jdbcType=VARCHAR}
- or find_in_set(#{request.currentUser,jdbcType=VARCHAR}, #{inCharges,jdbcType=VARCHAR}))
- <if test="request.keyWord != null and request.keyWord != ''">
- and (name like CONCAT('%', #{request.keyWord}, '%') or description like CONCAT('%',
- #{request.keyWord},
- '%'))
- </if>
- </where>
- order by create_time desc
- </select>
<select id="selectCountByGroupId" resultType="java.lang.Integer">
select count(1)
from data_stream
@@ -197,16 +159,20 @@
insert into data_stream (id, inlong_stream_id, inlong_group_id,
name, description, mq_resource_obj,
data_source_type, storage_period, data_type,
- data_encoding, file_delimiter,
- have_predefined_fields,
+ data_encoding, data_separator,
+ data_escape_char, have_predefined_fields,
+ daily_records, daily_storage,
+ peak_records, max_length,
in_charges, status, previous_status,
is_deleted, creator, modifier,
create_time, modify_time, temp_view)
values (#{id,jdbcType=INTEGER}, #{inlongStreamId,jdbcType=VARCHAR}, #{inlongGroupId,jdbcType=VARCHAR},
#{name,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR}, #{mqResourceObj,jdbcType=VARCHAR},
#{dataSourceType,jdbcType=VARCHAR}, #{storagePeriod,jdbcType=INTEGER}, #{dataType,jdbcType=VARCHAR},
- #{dataEncoding,jdbcType=VARCHAR}, #{fileDelimiter,jdbcType=VARCHAR},
- #{havePredefinedFields,jdbcType=INTEGER},
+ #{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
+ #{dataEscapeChar,jdbcType=VARCHAR}, #{havePredefinedFields,jdbcType=INTEGER},
+ #{dailyRecords,jdbcType=INTEGER}, #{dailyStorage,jdbcType=INTEGER},
+ #{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER},
#{inCharges,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER},
#{isDeleted,jdbcType=INTEGER}, #{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR},
#{createTime,jdbcType=TIMESTAMP}, #{modifyTime,jdbcType=TIMESTAMP}, #{tempView,jdbcType=LONGVARCHAR})
@@ -245,12 +211,27 @@
<if test="dataEncoding != null">
data_encoding,
</if>
- <if test="fileDelimiter != null">
- file_delimiter,
+ <if test="dataSeparator != null">
+ data_separator,
+ </if>
+ <if test="dataEscapeChar != null">
+ data_escape_char,
</if>
<if test="havePredefinedFields != null">
have_predefined_fields,
</if>
+ <if test="dailyRecords != null">
+ daily_records,
+ </if>
+ <if test="dailyStorage != null">
+ daily_storage,
+ </if>
+ <if test="peakRecords != null">
+ peak_records,
+ </if>
+ <if test="maxLength != null">
+ max_length,
+ </if>
<if test="inCharges != null">
in_charges,
</if>
@@ -310,12 +291,27 @@
<if test="dataEncoding != null">
#{dataEncoding,jdbcType=VARCHAR},
</if>
- <if test="fileDelimiter != null">
- #{fileDelimiter,jdbcType=VARCHAR},
+ <if test="dataSeparator != null">
+ #{dataSeparator,jdbcType=VARCHAR},
+ </if>
+ <if test="dataEscapeChar != null">
+ #{dataEscapeChar,jdbcType=VARCHAR},
</if>
<if test="havePredefinedFields != null">
#{havePredefinedFields,jdbcType=INTEGER},
</if>
+ <if test="dailyRecords != null">
+ #{dailyRecords,jdbcType=INTEGER},
+ </if>
+ <if test="dailyStorage != null">
+ #{dailyStorage,jdbcType=INTEGER},
+ </if>
+ <if test="peakRecords != null">
+ #{peakRecords,jdbcType=INTEGER},
+ </if>
+ <if test="maxLength != null">
+ #{maxLength,jdbcType=INTEGER},
+ </if>
<if test="inCharges != null">
#{inCharges,jdbcType=VARCHAR},
</if>
@@ -346,72 +342,6 @@
</trim>
</insert>
- <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
- update data_stream
- <set>
- <if test="inlongStreamId != null">
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- </if>
- <if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- </if>
- <if test="name != null">
- name = #{name,jdbcType=VARCHAR},
- </if>
- <if test="description != null">
- description = #{description,jdbcType=VARCHAR},
- </if>
- <if test="mqResourceObj != null">
- mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
- </if>
- <if test="dataSourceType != null">
- data_source_type = #{dataSourceType,jdbcType=VARCHAR},
- </if>
- <if test="storagePeriod != null">
- storage_period = #{storagePeriod,jdbcType=INTEGER},
- </if>
- <if test="dataType != null">
- data_type = #{dataType,jdbcType=VARCHAR},
- </if>
- <if test="dataEncoding != null">
- data_encoding = #{dataEncoding,jdbcType=VARCHAR},
- </if>
- <if test="fileDelimiter != null">
- file_delimiter = #{fileDelimiter,jdbcType=VARCHAR},
- </if>
- <if test="havePredefinedFields != null">
- have_predefined_fields = #{havePredefinedFields,jdbcType=INTEGER},
- </if>
- <if test="inCharges != null">
- in_charges = #{inCharges,jdbcType=VARCHAR},
- </if>
- <if test="status != null">
- status = #{status,jdbcType=INTEGER},
- </if>
- <if test="previousStatus != null">
- previous_status = #{previousStatus,jdbcType=INTEGER},
- </if>
- <if test="isDeleted != null">
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- </if>
- <if test="creator != null">
- creator = #{creator,jdbcType=VARCHAR},
- </if>
- <if test="modifier != null">
- modifier = #{modifier,jdbcType=VARCHAR},
- </if>
- <if test="createTime != null">
- create_time = #{createTime,jdbcType=TIMESTAMP},
- </if>
- <if test="modifyTime != null">
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- </if>
- <if test="tempView != null">
- temp_view = #{tempView,jdbcType=LONGVARCHAR},
- </if>
- </set>
- where id = #{id,jdbcType=INTEGER}
- </update>
<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.DataStreamEntity">
update data_stream
set inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
@@ -423,8 +353,13 @@
storage_period = #{storagePeriod,jdbcType=INTEGER},
data_type = #{dataType,jdbcType=VARCHAR},
data_encoding = #{dataEncoding,jdbcType=VARCHAR},
- file_delimiter = #{fileDelimiter,jdbcType=VARCHAR},
+ data_separator = #{dataSeparator,jdbcType=VARCHAR},
+ data_escape_char = #{dataEscapeChar,jdbcType=VARCHAR},
have_predefined_fields = #{havePredefinedFields,jdbcType=INTEGER},
+ daily_records = #{dailyRecords,jdbcType=INTEGER},
+ daily_storage = #{dailyStorage,jdbcType=INTEGER},
+ peak_records = #{peakRecords,jdbcType=INTEGER},
+ max_length = #{maxLength,jdbcType=INTEGER},
in_charges = #{inCharges,jdbcType=VARCHAR},
status = #{status,jdbcType=INTEGER},
previous_status = #{previousStatus,jdbcType=INTEGER},
@@ -440,55 +375,76 @@
update data_stream
<set>
<if test="inlongStreamId != null">
- inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
</if>
<if test="inlongGroupId != null">
- inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
</if>
<if test="name != null">
- name = #{name, jdbcType=VARCHAR},
+ name = #{name,jdbcType=VARCHAR},
</if>
<if test="description != null">
- description = #{description, jdbcType=VARCHAR},
+ description = #{description,jdbcType=VARCHAR},
</if>
<if test="mqResourceObj != null">
- mq_resource_obj = #{mqResourceObj, jdbcType=VARCHAR},
+ mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
</if>
<if test="dataSourceType != null">
- data_source_type = #{dataSourceType, jdbcType=VARCHAR},
+ data_source_type = #{dataSourceType,jdbcType=VARCHAR},
</if>
<if test="storagePeriod != null">
- storage_period = #{storagePeriod, jdbcType=INTEGER},
+ storage_period = #{storagePeriod,jdbcType=INTEGER},
</if>
<if test="dataType != null">
- data_type = #{dataType, jdbcType=VARCHAR},
+ data_type = #{dataType,jdbcType=VARCHAR},
</if>
<if test="dataEncoding != null">
- data_encoding = #{dataEncoding, jdbcType=VARCHAR},
+ data_encoding = #{dataEncoding,jdbcType=VARCHAR},
</if>
- <if test="fileDelimiter != null">
- file_delimiter = #{fileDelimiter, jdbcType=VARCHAR},
+ <if test="dataSeparator != null">
+ data_separator = #{dataSeparator,jdbcType=VARCHAR},
+ </if>
+ <if test="dataEscapeChar != null">
+ data_escape_char = #{dataEscapeChar,jdbcType=VARCHAR},
</if>
<if test="havePredefinedFields != null">
- have_predefined_fields = #{havePredefinedFields, jdbcType=INTEGER},
+ have_predefined_fields = #{havePredefinedFields,jdbcType=INTEGER},
+ </if>
+ <if test="dailyRecords!= null">
+ daily_records= #{dailyRecords,jdbcType=INTEGER},
+ </if>
+ <if test="dailyStorage!= null">
+ daily_storage= #{dailyStorage,jdbcType=INTEGER},
+ </if>
+ <if test="peakRecords != null">
+ peak_records= #{peakRecords,jdbcType=INTEGER},
+ </if>
+ <if test="maxLength != null">
+ max_length= #{maxLength,jdbcType=INTEGER}
</if>
<if test="inCharges != null">
- in_charges = #{inCharges, jdbcType=VARCHAR},
+ in_charges = #{inCharges,jdbcType=VARCHAR},
</if>
<if test="status != null">
status = #{status,jdbcType=INTEGER},
</if>
<if test="previousStatus != null">
- status = #{previousStatus, jdbcType=INTEGER},
+ previous_status = #{previousStatus,jdbcType=INTEGER},
</if>
<if test="isDeleted != null">
- is_deleted = #{isDeleted, jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
</if>
<if test="creator != null">
- creator = #{creator, jdbcType=VARCHAR},
+ creator = #{creator,jdbcType=VARCHAR},
</if>
<if test="modifier != null">
- modifier = #{modifier, jdbcType=VARCHAR},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ </if>
+ <if test="createTime != null">
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ </if>
+ <if test="modifyTime != null">
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
</if>
<if test="tempView != null">
temp_view = #{tempView,jdbcType=LONGVARCHAR},
@@ -518,65 +474,4 @@
and is_deleted = 0
</update>
- <select id="selectStreamToHiveInfo" resultMap="dataStreamFullInfo">
- SELECT h.id,
- h.jdbc_url,
- h.password,
- h.username,
- h.status,
- data_type,
- usage_interval,
- description,
- s.inlong_stream_id,
- s.inlong_group_id,
- db_name,
- field_splitter,
- h.creator,
- table_name,
- partition_type,
- primary_partition,
- secondary_partition,
- file_format,
- h.storage_period,
- h.encoding_type
- FROM data_stream s,
- storage_hive h
- WHERE s.is_deleted = 0
- and h.is_deleted = 0
- and s.inlong_stream_id = h.inlong_stream_id
- AND s.inlong_group_id = h.inlong_group_id
- and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
- </select>
-
- <select id="selectStreamToHiveInfoByIdentifier" resultMap="dataStreamFullInfo">
- SELECT h.id,
- h.jdbc_url,
- h.password,
- h.username,
- h.status,
- data_type,
- usage_interval,
- description,
- s.inlong_stream_id,
- s.inlong_group_id,
- db_name,
- field_splitter,
- h.creator,
- table_name,
- partition_type,
- primary_partition,
- secondary_partition,
- file_format,
- h.storage_period,
- h.encoding_type
- FROM data_stream s,
- storage_hive h
- WHERE s.is_deleted = 0
- and h.is_deleted = 0
- and s.inlong_stream_id = h.inlong_stream_id
- AND s.inlong_group_id = h.inlong_group_id
- and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
- and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
- </select>
-
</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
index ba958b9..52c3178 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StorageHiveEntityMapper.xml
@@ -29,16 +29,21 @@
<result column="password" jdbcType="VARCHAR" property="password"/>
<result column="db_name" jdbcType="VARCHAR" property="dbName"/>
<result column="table_name" jdbcType="VARCHAR" property="tableName"/>
+ <result column="hdfs_default_fs" jdbcType="VARCHAR" property="hdfsDefaultFs"/>
+ <result column="warehouse_dir" jdbcType="VARCHAR" property="warehouseDir"/>
+
+ <result column="partition_interval" jdbcType="INTEGER" property="partitionInterval"/>
+ <result column="partition_unit" jdbcType="VARCHAR" property="partitionUnit"/>
<result column="primary_partition" jdbcType="VARCHAR" property="primaryPartition"/>
<result column="secondary_partition" jdbcType="VARCHAR" property="secondaryPartition"/>
- <result column="partition_type" jdbcType="VARCHAR" property="partitionType"/>
+ <result column="partition_creation_strategy" jdbcType="VARCHAR" property="partitionCreationStrategy"/>
+
<result column="file_format" jdbcType="VARCHAR" property="fileFormat"/>
- <result column="field_splitter" jdbcType="VARCHAR" property="fieldSplitter"/>
- <result column="encoding_type" jdbcType="VARCHAR" property="encodingType"/>
- <result column="hdfs_default_fs" jdbcType="VARCHAR" property="hdfsDefaultFs"/>
- <result column="warehouse_dir" jdbcType="VARCHAR" property="warehouseDir"/>
- <result column="usage_interval" jdbcType="VARCHAR" property="usageInterval"/>
+ <result column="data_encoding" jdbcType="VARCHAR" property="dataEncoding"/>
+ <result column="data_separator" jdbcType="VARCHAR" property="dataSeparator"/>
<result column="storage_period" jdbcType="INTEGER" property="storagePeriod"/>
+ <result column="opt_log" jdbcType="VARCHAR" property="optLog"/>
+
<result column="status" jdbcType="INTEGER" property="status"/>
<result column="previous_status" jdbcType="INTEGER" property="previousStatus"/>
<result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
@@ -46,15 +51,14 @@
<result column="modifier" jdbcType="VARCHAR" property="modifier"/>
<result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
<result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
- <result column="opt_log" jdbcType="VARCHAR" property="optLog"/>
<result column="temp_view" jdbcType="LONGVARCHAR" property="tempView"/>
</resultMap>
<sql id="Base_Column_List">
id, inlong_group_id, inlong_stream_id, jdbc_url, username, password, db_name, table_name,
- primary_partition, secondary_partition, partition_type, file_format, field_splitter, encoding_type,
- hdfs_default_fs, warehouse_dir, usage_interval, storage_period, status, previous_status,
- is_deleted, creator, modifier, create_time, modify_time, opt_log, temp_view
+ hdfs_default_fs, warehouse_dir, partition_interval, partition_unit, primary_partition, secondary_partition,
+ partition_creation_strategy, file_format, data_encoding, data_separator, storage_period, opt_log,
+ status, previous_status, is_deleted, creator, modifier, create_time, modify_time, temp_view
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -70,14 +74,20 @@
from storage_hive
<where>
is_deleted = 0
- <if test="inlongGroupId != null and inlongGroupId != ''">
- and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+ <if test="request.inlongGroupId != null and request.inlongGroupId != ''">
+ and inlong_group_id = #{request.inlongGroupId, jdbcType=VARCHAR}
</if>
- <if test="inlongStreamId != null and inlongStreamId != ''">
- and inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
+ <if test="request.inlongStreamId != null and request.inlongStreamId != ''">
+ and inlong_stream_id = #{request.inlongStreamId, jdbcType=VARCHAR}
</if>
- <if test="status != null and status != ''">
- and status = #{status, jdbcType=INTEGER}
+ <if test="request.keyWord != null and request.keyWord != ''">
+ and (
+ inlong_group_id like CONCAT('%', #{request.keyWord}, '%')
+ or inlong_stream_id like CONCAT('%', #{request.keyWord}, '%')
+ )
+ </if>
+ <if test="request.status != null and request.status != ''">
+ and status = #{request.status, jdbcType=INTEGER}
</if>
order by modify_time desc
</where>
@@ -121,8 +131,7 @@
and is_deleted = 0
</where>
</select>
- <select id="selectSummaryByIdentifier"
- resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo">
+ <select id="selectSummary" resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageSummaryInfo">
select s.id,
s.inlong_group_id,
s.inlong_stream_id,
@@ -132,6 +141,51 @@
and s.inlong_group_id = #{groupId, jdbcType=VARCHAR}
and s.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
</select>
+ <select id="selectHiveSortInfoByIdentifier"
+ resultType="org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo">
+ SELECT hive.id,
+ hive.inlong_group_id,
+ hive.inlong_stream_id,
+
+ hive.jdbc_url,
+ hive.username,
+ hive.password,
+ hive.db_name,
+ hive.table_name,
+ hive.hdfs_default_fs,
+ hive.warehouse_dir,
+
+ hive.partition_interval,
+ hive.partition_unit,
+ hive.primary_partition,
+ hive.secondary_partition,
+ hive.partition_creation_strategy,
+
+ hive.file_format,
+ hive.data_encoding,
+ hive.data_separator as targetSeparator,
+ hive.status,
+ hive.creator,
+
+ stream.mq_resource_obj,
+ stream.data_source_type,
+ stream.data_type,
+ stream.description,
+ stream.data_separator as sourceSeparator,
+ stream.data_escape_char
+ FROM data_stream stream,
+ storage_hive hive
+ <where>
+ stream.is_deleted = 0
+ and hive.is_deleted = 0
+ and stream.inlong_group_id = hive.inlong_group_id
+ and stream.inlong_stream_id = hive.inlong_stream_id
+ and stream.inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ <if test="streamId != null and streamId != ''">
+ and stream.inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ </if>
+ </where>
+ </select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
delete
@@ -143,23 +197,26 @@
parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
insert into storage_hive (id, inlong_group_id, inlong_stream_id,
jdbc_url, username, password,
- db_name, table_name, primary_partition,
- secondary_partition, partition_type, file_format,
- field_splitter, encoding_type, hdfs_default_fs,
- warehouse_dir, usage_interval, storage_period,
+ db_name, table_name, hdfs_default_fs,
+ warehouse_dir, partition_interval,
+ partition_unit, primary_partition,
+ secondary_partition, partition_creation_strategy,
+ file_format, data_encoding, data_separator,
+ storage_period, opt_log,
status, previous_status, is_deleted,
creator, modifier, create_time,
- modify_time, opt_log, temp_view)
+ modify_time, temp_view)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{inlongStreamId,jdbcType=VARCHAR},
#{jdbcUrl,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR},
- #{dbName,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{primaryPartition,jdbcType=VARCHAR},
- #{secondaryPartition,jdbcType=VARCHAR}, #{partitionType,jdbcType=VARCHAR},
- #{fileFormat,jdbcType=VARCHAR},
- #{fieldSplitter,jdbcType=VARCHAR}, #{encodingType,jdbcType=VARCHAR}, #{hdfsDefaultFs,jdbcType=VARCHAR},
- #{warehouseDir,jdbcType=VARCHAR}, #{usageInterval,jdbcType=VARCHAR}, #{storagePeriod,jdbcType=INTEGER},
+ #{dbName,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{hdfsDefaultFs,jdbcType=VARCHAR},
+ #{warehouseDir,jdbcType=VARCHAR}, #{partitionInterval,jdbcType=INTEGER},
+ #{partitionUnit,jdbcType=VARCHAR}, #{primaryPartition,jdbcType=VARCHAR},
+ #{secondaryPartition,jdbcType=VARCHAR}, #{partitionCreationStrategy,jdbcType=VARCHAR},
+ #{fileFormat,jdbcType=VARCHAR}, #{dataEncoding,jdbcType=VARCHAR}, #{dataSeparator,jdbcType=VARCHAR},
+ #{storagePeriod,jdbcType=INTEGER}, #{optLog,jdbcType=VARCHAR},
#{status,jdbcType=INTEGER}, #{previousStatus,jdbcType=INTEGER}, #{isDeleted,jdbcType=INTEGER},
#{creator,jdbcType=VARCHAR}, #{modifier,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP},
- #{modifyTime,jdbcType=TIMESTAMP}, #{optLog,jdbcType=VARCHAR}, #{tempView,jdbcType=LONGVARCHAR})
+ #{modifyTime,jdbcType=TIMESTAMP}, #{tempView,jdbcType=LONGVARCHAR})
</insert>
<insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
@@ -189,36 +246,42 @@
<if test="tableName != null">
table_name,
</if>
+ <if test="hdfsDefaultFs != null">
+ hdfs_default_fs,
+ </if>
+ <if test="warehouseDir != null">
+ warehouse_dir,
+ </if>
+ <if test="partitionInterval != null">
+ partition_interval,
+ </if>
+ <if test="partitionUnit != null">
+ partition_unit,
+ </if>
<if test="primaryPartition != null">
primary_partition,
</if>
<if test="secondaryPartition != null">
secondary_partition,
</if>
- <if test="partitionType != null">
- partition_type,
+ <if test="partitionCreationStrategy != null">
+ partition_creation_strategy,
</if>
<if test="fileFormat != null">
file_format,
</if>
- <if test="fieldSplitter != null">
- field_splitter,
+ <if test="dataEncoding != null">
+ data_encoding,
</if>
- <if test="encodingType != null">
- encoding_type,
- </if>
- <if test="hdfsDefaultFs != null">
- hdfs_default_fs,
- </if>
- <if test="warehouseDir != null">
- warehouse_dir,
- </if>
- <if test="usageInterval != null">
- usage_interval,
+ <if test="dataSeparator != null">
+ data_separator,
</if>
<if test="storagePeriod != null">
storage_period,
</if>
+ <if test="optLog != null">
+ opt_log,
+ </if>
<if test="status != null">
status,
</if>
@@ -240,9 +303,6 @@
<if test="modifyTime != null">
modify_time,
</if>
- <if test="optLog != null">
- opt_log,
- </if>
<if test="tempView != null">
temp_view,
</if>
@@ -272,36 +332,42 @@
<if test="tableName != null">
#{tableName,jdbcType=VARCHAR},
</if>
+ <if test="hdfsDefaultFs != null">
+ #{hdfsDefaultFs,jdbcType=VARCHAR},
+ </if>
+ <if test="warehouseDir != null">
+ #{warehouseDir,jdbcType=VARCHAR},
+ </if>
+ <if test="partitionInterval != null">
+ #{partitionInterval,jdbcType=INTEGER},
+ </if>
+ <if test="partitionUnit != null">
+ #{partitionUnit,jdbcType=VARCHAR},
+ </if>
<if test="primaryPartition != null">
#{primaryPartition,jdbcType=VARCHAR},
</if>
<if test="secondaryPartition != null">
#{secondaryPartition,jdbcType=VARCHAR},
</if>
- <if test="partitionType != null">
- #{partitionType,jdbcType=VARCHAR},
+ <if test="partitionCreationStrategy != null">
+ #{partitionCreationStrategy,jdbcType=VARCHAR},
</if>
<if test="fileFormat != null">
#{fileFormat,jdbcType=VARCHAR},
</if>
- <if test="fieldSplitter != null">
- #{fieldSplitter,jdbcType=VARCHAR},
+ <if test="dataEncoding != null">
+ #{dataEncoding,jdbcType=VARCHAR},
</if>
- <if test="encodingType != null">
- #{encodingType,jdbcType=VARCHAR},
- </if>
- <if test="hdfsDefaultFs != null">
- #{hdfsDefaultFs,jdbcType=VARCHAR},
- </if>
- <if test="warehouseDir != null">
- #{warehouseDir,jdbcType=VARCHAR},
- </if>
- <if test="usageInterval != null">
- #{usageInterval,jdbcType=VARCHAR},
+ <if test="dataSeparator != null">
+ #{dataSeparator,jdbcType=VARCHAR},
</if>
<if test="storagePeriod != null">
#{storagePeriod,jdbcType=INTEGER},
</if>
+ <if test="optLog != null">
+ #{optLog,jdbcType=VARCHAR},
+ </if>
<if test="status != null">
#{status,jdbcType=INTEGER},
</if>
@@ -323,9 +389,6 @@
<if test="modifyTime != null">
#{modifyTime,jdbcType=TIMESTAMP},
</if>
- <if test="optLog != null">
- #{optLog,jdbcType=VARCHAR},
- </if>
<if test="tempView != null">
#{tempView,jdbcType=LONGVARCHAR},
</if>
@@ -356,36 +419,42 @@
<if test="tableName != null">
table_name = #{tableName,jdbcType=VARCHAR},
</if>
+ <if test="hdfsDefaultFs != null">
+ hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
+ </if>
+ <if test="warehouseDir != null">
+ warehouse_dir = #{warehouseDir,jdbcType=VARCHAR},
+ </if>
+ <if test="partitionInterval != null">
+ partition_interval = #{partitionInterval,jdbcType=INTEGER},
+ </if>
+ <if test="partitionUnit != null">
+ partition_unit = #{partitionUnit,jdbcType=VARCHAR},
+ </if>
<if test="primaryPartition != null">
primary_partition = #{primaryPartition,jdbcType=VARCHAR},
</if>
<if test="secondaryPartition != null">
secondary_partition = #{secondaryPartition,jdbcType=VARCHAR},
</if>
- <if test="partitionType != null">
- partition_type = #{partitionType,jdbcType=VARCHAR},
+ <if test="partitionCreationStrategy != null">
+ partition_creation_strategy = #{partitionCreationStrategy,jdbcType=VARCHAR},
</if>
<if test="fileFormat != null">
file_format = #{fileFormat,jdbcType=VARCHAR},
</if>
- <if test="fieldSplitter != null">
- field_splitter = #{fieldSplitter,jdbcType=VARCHAR},
- </if>
- <if test="encodingType != null">
- encoding_type = #{encodingType,jdbcType=VARCHAR},
- </if>
- <if test="hdfsDefaultFs != null">
- hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
- </if>
- <if test="warehouseDir != null">
- warehouse_dir = #{warehouseDir,jdbcType=VARCHAR},
+ <if test="dataEncoding != null">
+ data_encoding = #{dataEncoding,jdbcType=VARCHAR},
</if>
- <if test="usageInterval != null">
- usage_interval = #{usageInterval,jdbcType=VARCHAR},
+ <if test="dataSeparator != null">
+ data_separator = #{dataSeparator,jdbcType=VARCHAR},
</if>
<if test="storagePeriod != null">
storage_period = #{storagePeriod,jdbcType=INTEGER},
</if>
+ <if test="optLog != null">
+ opt_log = #{optLog,jdbcType=VARCHAR},
+ </if>
<if test="status != null">
status = #{status,jdbcType=INTEGER},
</if>
@@ -407,9 +476,6 @@
<if test="modifyTime != null">
modify_time = #{modifyTime,jdbcType=TIMESTAMP},
</if>
- <if test="optLog != null">
- opt_log = #{optLog,jdbcType=VARCHAR},
- </if>
<if test="tempView != null">
temp_view = #{tempView,jdbcType=LONGVARCHAR},
</if>
@@ -418,32 +484,33 @@
</update>
<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
update storage_hive
- set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
- jdbc_url = #{jdbcUrl,jdbcType=VARCHAR},
- username = #{username,jdbcType=VARCHAR},
- password = #{password,jdbcType=VARCHAR},
- db_name = #{dbName,jdbcType=VARCHAR},
- table_name = #{tableName,jdbcType=VARCHAR},
- primary_partition = #{primaryPartition,jdbcType=VARCHAR},
- secondary_partition = #{secondaryPartition,jdbcType=VARCHAR},
- partition_type = #{partitionType,jdbcType=VARCHAR},
- file_format = #{fileFormat,jdbcType=VARCHAR},
- field_splitter = #{fieldSplitter,jdbcType=VARCHAR},
- encoding_type = #{encodingType,jdbcType=VARCHAR},
- hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
- warehouse_dir = #{warehouseDir,jdbcType=VARCHAR},
- usage_interval = #{usageInterval,jdbcType=VARCHAR},
- storage_period = #{storagePeriod,jdbcType=INTEGER},
- status = #{status,jdbcType=INTEGER},
- previous_status = #{previousStatus,jdbcType=INTEGER},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- creator = #{creator,jdbcType=VARCHAR},
- modifier = #{modifier,jdbcType=VARCHAR},
- create_time = #{createTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- opt_log = #{optLog,jdbcType=VARCHAR},
- temp_view = #{tempView,jdbcType=LONGVARCHAR}
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ inlong_stream_id = #{inlongStreamId,jdbcType=VARCHAR},
+ jdbc_url = #{jdbcUrl,jdbcType=VARCHAR},
+ username = #{username,jdbcType=VARCHAR},
+ password = #{password,jdbcType=VARCHAR},
+ db_name = #{dbName,jdbcType=VARCHAR},
+ table_name = #{tableName,jdbcType=VARCHAR},
+ hdfs_default_fs = #{hdfsDefaultFs,jdbcType=VARCHAR},
+ warehouse_dir = #{warehouseDir,jdbcType=VARCHAR},
+ partition_interval = #{partitionInterval,jdbcType=INTEGER},
+ partition_unit = #{partitionUnit,jdbcType=VARCHAR},
+ primary_partition = #{primaryPartition,jdbcType=VARCHAR},
+ secondary_partition = #{secondaryPartition,jdbcType=VARCHAR},
+ partition_creation_strategy = #{partitionCreationStrategy,jdbcType=VARCHAR},
+ file_format = #{fileFormat,jdbcType=VARCHAR},
+ data_encoding = #{dataEncoding,jdbcType=VARCHAR},
+ data_separator = #{dataSeparator,jdbcType=VARCHAR},
+ storage_period = #{storagePeriod,jdbcType=INTEGER},
+ opt_log = #{optLog,jdbcType=VARCHAR},
+ status = #{status,jdbcType=INTEGER},
+ previous_status = #{previousStatus,jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ creator = #{creator,jdbcType=VARCHAR},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+ temp_view = #{tempView,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
<update id="updateStorageStatusById" parameterType="org.apache.inlong.manager.dao.entity.StorageHiveEntity">
diff --git a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
index 9b8e124..08b2320 100644
--- a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
@@ -85,7 +85,7 @@ CREATE TABLE `business`
`schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'Business status',
+ `status` int(4) DEFAULT '21' COMMENT 'Business status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -150,7 +150,7 @@ CREATE TABLE `cluster_info`
`url` varchar(256) DEFAULT NULL COMMENT 'Cluster URL address',
`is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
`ext_props` text DEFAULT NULL COMMENT 'extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -183,7 +183,7 @@ CREATE TABLE `common_db_server`
`db_description` varchar(256) DEFAULT NULL COMMENT 'DB description',
`backup_db_server_ip` varchar(64) DEFAULT NULL COMMENT 'Backup DB HOST',
`backup_db_port` int(11) DEFAULT NULL COMMENT 'Backup DB port',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -208,7 +208,7 @@ CREATE TABLE `common_file_server`
`issue_type` varchar(128) DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
`username` varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -234,16 +234,35 @@ CREATE TABLE `consumption`
`topic` varchar(255) NOT NULL COMMENT 'Consumption topic',
`filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
`inlong_stream_id` varchar(1024) DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
- `status` int(11) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `status` int(4) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'creator',
`modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
- `is_deleted` int(2) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
PRIMARY KEY (`id`)
);
-- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `consumption_id` int(11) DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+ `consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
+ `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+ `inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group ID',
+ `is_rlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+ `retry_letter_topic` varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+ `is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+ `dead_letter_topic` varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete',
+ PRIMARY KEY (`id`)
+) COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
-- Table structure for data_proxy_cluster
-- ----------------------------
DROP TABLE IF EXISTS `data_proxy_cluster`;
@@ -259,7 +278,7 @@ CREATE TABLE `data_proxy_cluster`
`net_type` varchar(20) DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
`ext_props` text DEFAULT NULL COMMENT 'Extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'Cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'Cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -302,7 +321,8 @@ CREATE TABLE `data_source_cmd_config`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Last update time ',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`result_info` varchar(64) DEFAULT NULL,
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_1` (`task_id`, `bSend`, `specified_data_time`)
);
-- ----------------------------
@@ -321,18 +341,24 @@ CREATE TABLE `data_stream`
`storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
`data_type` varchar(20) DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
- `file_delimiter` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
`have_predefined_fields` tinyint(1) DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(11) DEFAULT '0' COMMENT 'Data stream status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data stream status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_data_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`, `modify_time`)
);
-- ----------------------------
@@ -348,7 +374,8 @@ CREATE TABLE `data_stream_ext`
`key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_stream_id` (`inlong_stream_id`)
);
-- ----------------------------
@@ -372,7 +399,8 @@ CREATE TABLE `data_stream_field`
`bon_field_path` varchar(256) DEFAULT NULL COMMENT 'BON field path',
`bon_field_type` varchar(64) DEFAULT NULL COMMENT 'BON field type',
`encrypt_level` varchar(20) DEFAULT NULL COMMENT 'Encryption level',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_stream_id` (`inlong_stream_id`)
);
-- ----------------------------
@@ -393,7 +421,7 @@ CREATE TABLE `operation_log`
`cost_time` bigint(20) DEFAULT NULL COMMENT 'time-consuming',
`body` text COMMENT 'Request body',
`param` text COMMENT 'parameter',
- `status` tinyint(1) DEFAULT NULL COMMENT 'status',
+ `status` int(4) DEFAULT NULL COMMENT 'status',
`request_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
`err_msg` text COMMENT 'Error message',
PRIMARY KEY (`id`)
@@ -413,7 +441,9 @@ CREATE TABLE `role`
`create_by` varchar(255) NOT NULL,
`update_by` varchar(255) NOT NULL,
`disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `role_role_code_uindex` (`role_code`),
+ UNIQUE KEY `role_role_name_uindex` (`role_name`)
);
-- ----------------------------
@@ -431,7 +461,7 @@ CREATE TABLE `source_db_basic`
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
PRIMARY KEY (`id`)
);
@@ -452,8 +482,8 @@ CREATE TABLE `source_db_detail`
`table_fields` longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
`data_sql` longtext COMMENT 'SQL statement to collect source data, required for full amount',
`crontab` varchar(56) DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -506,8 +536,8 @@ CREATE TABLE `source_file_detail`
`username` varchar(32) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
`file_path` varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -529,44 +559,45 @@ CREATE TABLE `storage_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_storage_id` (`storage_id`)
);
-- ----------------------------
-- Table structure for storage_hive
-- ----------------------------
DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
CREATE TABLE `storage_hive`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
- `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
- `jdbc_url` varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
- `username` varchar(128) NOT NULL COMMENT 'Username',
- `password` varchar(255) NOT NULL COMMENT 'User password',
- `db_name` varchar(128) NOT NULL COMMENT 'Target database name',
- `table_name` varchar(128) NOT NULL COMMENT 'Target data table name',
- `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
- `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
- `partition_type` varchar(10) DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
- `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
- `encoding_type` varchar(255) DEFAULT NULL COMMENT 'data encoding',
- `field_splitter` varchar(10) DEFAULT NULL COMMENT 'field separator',
- `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
- `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
- `usage_interval` varchar(10) DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
- `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
- `status` int(11) DEFAULT '0' COMMENT 'status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
- `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
+ `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
+ `jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+ `username` varchar(128) DEFAULT NULL COMMENT 'Username',
+ `password` varchar(255) DEFAULT NULL COMMENT 'User password',
+ `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
+ `table_name` varchar(128) DEFAULT NULL COMMENT 'Target data table name',
+ `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+ `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+ `partition_interval` int(5) DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+ `partition_unit` varchar(10) DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+ `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
+ `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
+ `partition_creation_strategy` varchar(50) DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+ `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+ `data_encoding` varchar(20) DEFAULT 'UTF-8' COMMENT 'data encoding type',
+ `data_separator` varchar(10) DEFAULT NULL COMMENT 'data field separator',
+ `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
+ `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+ `temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
PRIMARY KEY (`id`)
);
@@ -590,7 +621,8 @@ CREATE TABLE `storage_hive_field`
`is_exist` tinyint(1) DEFAULT '0' COMMENT 'Does it exist, 0: does not exist, 1: exists',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_storage_id` (`storage_id`)
);
-- ----------------------------
@@ -680,7 +712,8 @@ CREATE TABLE `user`
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
`create_by` varchar(255) NOT NULL COMMENT 'create by sb.',
`update_by` varchar(255) DEFAULT NULL COMMENT 'update by sb.',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `user_name_uindex` (`name`)
);
-- create default admin user, username is 'admin', password is 'inlong'
@@ -723,7 +756,8 @@ CREATE TABLE `wf_approver`
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'update time',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `process_name_task_name_index` (`process_name`, `task_name`)
);
-- create default approver for new consumption and new business
@@ -778,7 +812,7 @@ CREATE TABLE `wf_process_instance`
`form_data` mediumtext COMMENT 'form information',
`start_time` datetime NOT NULL COMMENT 'start time',
`end_time` datetime DEFAULT NULL COMMENT 'End event',
- `ext` text COMMENT 'Extended information-text',
+ `ext` text COMMENT 'Extended information-json',
`hidden` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it hidden',
PRIMARY KEY (`id`)
);
@@ -804,7 +838,7 @@ CREATE TABLE `wf_task_instance`
`form_data` mediumtext COMMENT 'form information submitted by the current task',
`start_time` datetime NOT NULL COMMENT 'start time',
`end_time` datetime DEFAULT NULL COMMENT 'End time',
- `ext` text COMMENT 'Extended information-text',
+ `ext` text COMMENT 'Extended information-json',
PRIMARY KEY (`id`)
);
@@ -821,13 +855,14 @@ CREATE TABLE `cluster_set`
`middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
`in_charges` varchar(512) COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'ClusterSet status',
+ `status` int(4) DEFAULT '21' COMMENT 'ClusterSet status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) NULL COMMENT 'Modifier name',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cluster_set` (`set_name`)
);
-- ----------------------------
@@ -839,7 +874,8 @@ CREATE TABLE `cluster_set_inlongid`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`, `inlong_group_id`)
);
-- ----------------------------
@@ -852,7 +888,8 @@ CREATE TABLE `cache_cluster`
`cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cache_cluster` (`cluster_name`)
);
-- ----------------------------
@@ -866,8 +903,9 @@ CREATE TABLE `cache_cluster_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_cache_cluster` (`cluster_name`)
);
-- ----------------------------
@@ -880,7 +918,8 @@ CREATE TABLE `cache_topic`
`topic_name` varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`partition_num` int(11) NOT NULL COMMENT 'Partition number',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cache_topic` (`topic_name`, `set_name`)
);
-- ----------------------------
@@ -893,7 +932,8 @@ CREATE TABLE `proxy_cluster`
`cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_proxy_cluster` (`cluster_name`, `set_name`)
);
-- ----------------------------
@@ -905,7 +945,8 @@ CREATE TABLE `proxy_cluster_to_cache_cluster`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
`cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`, `cache_cluster_name`)
);
-- ----------------------------
@@ -920,7 +961,8 @@ CREATE TABLE `flume_source`
`type` varchar(128) NOT NULL COMMENT 'FlumeSource classname',
`channels` varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space',
`selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_flume_source` (`source_name`, `set_name`)
);
-- ----------------------------
@@ -935,8 +977,9 @@ CREATE TABLE `flume_source_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_flume_source_ext` (`parent_name`)
);
-- ----------------------------
@@ -949,7 +992,8 @@ CREATE TABLE `flume_channel`
`channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`type` varchar(128) NOT NULL COMMENT 'FlumeChannel classname',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_flume_channel` (`channel_name`, `set_name`)
);
-- ----------------------------
@@ -964,8 +1008,9 @@ CREATE TABLE `flume_channel_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_flume_channel_ext` (`parent_name`)
);
-- ----------------------------
@@ -979,7 +1024,8 @@ CREATE TABLE `flume_sink`
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`type` varchar(128) NOT NULL COMMENT 'FlumeSink classname',
`channel` varchar(128) NOT NULL COMMENT 'FlumeSink channel',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_flume_sink` (`sink_name`, `set_name`)
);
-- ----------------------------
@@ -994,8 +1040,9 @@ CREATE TABLE `flume_sink_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_flume_sink_ext` (`parent_name`)
);
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index 75b463b..1221742 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -21,7 +21,6 @@ import com.github.pagehelper.PageInfo;
import java.util.List;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
@@ -72,23 +71,6 @@ public interface DataStreamService {
PageInfo<DataStreamListVO> listByCondition(DataStreamPageRequest request);
/**
- * Query all hive config for business group id
- *
- * @param groupId Business group id
- * @return Hive config list
- */
- List<DataStreamInfoToHiveConfig> queryHiveConfigForAllDataStream(String groupId);
-
- /**
- * Query hive config for one data stream
- *
- * @param groupId Business group id
- * @param streamId Data stream id
- * @return Hive config
- */
- DataStreamInfoToHiveConfig queryHiveConfigForOneDataStream(String groupId, String streamId);
-
- /**
* Business information that needs to be modified
*
* @param dataStreamInfo data stream information that needs to be modified
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 152125a..ba64f16 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -416,47 +416,11 @@ public class BusinessServiceImpl implements BusinessService {
// If you need to change business info after approve, just do in here
this.updateStatus(groupId, EntityStatus.BIZ_CONFIG_ING.getCode(), operator);
- if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
- BusinessPulsarEntity pulsarEntity = checkAndGetEntity(approveInfo);
- businessPulsarMapper.updateByIdentifierSelective(pulsarEntity);
- }
-
LOGGER.info("success to update business status after approve for groupId={}", groupId);
return true;
}
/**
- * Check whether the Pulsar parameters filled in during approval are valid,
- * if valid, return to the encapsulated entity
- */
- private BusinessPulsarEntity checkAndGetEntity(BusinessApproveInfo approveInfo) {
- // Pulsar params must meet: ackQuorum <= writeQuorum <= ensemble
- Integer ackQuorum = approveInfo.getAckQuorum();
- Integer writeQuorum = approveInfo.getWriteQuorum();
- Integer ensemble = approveInfo.getEnsemble();
- Preconditions.checkNotNull(ackQuorum, "Pulsar ackQuorum cannot be empty");
- Preconditions.checkNotNull(writeQuorum, "Pulsar writeQuorum cannot be empty");
- Preconditions.checkNotNull(ensemble, "Pulsar ensemble cannot be empty");
- if (!(ackQuorum <= writeQuorum && writeQuorum <= ensemble)) {
- throw new BusinessException(BizErrorCodeEnum.BUSINESS_SAVE_FAILED,
- "Pulsar params must meet: ackQuorum <= writeQuorum <= ensemble");
- }
-
- Preconditions.checkTrue(approveInfo.getTopicPartitionNum() != null
- && approveInfo.getTopicPartitionNum() >= 1 && approveInfo.getTopicPartitionNum() <= 20,
- "topic partition num must meet >= 1 and <= 20");
-
- Preconditions.checkTrue(approveInfo.getTtl() != null && approveInfo.getTtlUnit() != null,
- "retention size and unit cannot be empty");
- Preconditions.checkTrue(approveInfo.getRetentionSize() != null && approveInfo.getRetentionSizeUnit() != null,
- "retention size and unit cannot be empty");
- Preconditions.checkTrue(approveInfo.getRetentionTime() != null && approveInfo.getRetentionTimeUnit() != null,
- "retention size and unit cannot be empty");
-
- return CommonBeanUtils.copyProperties(approveInfo, BusinessPulsarEntity::new);
- }
-
- /**
* Update extended information
* <p/>First physically delete the existing extended information, and then add this batch of extended information
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index 32d91af..8e30a74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -42,7 +42,6 @@ import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamExtInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamFieldInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamListVO;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamPageRequest;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamSummaryInfo;
@@ -94,11 +93,11 @@ public class DataStreamServiceImpl implements DataStreamService {
@Transactional(rollbackFor = Throwable.class)
@Override
- public Integer save(DataStreamInfo dataStreamInfo, String operator) {
- LOGGER.debug("begin to save data stream info={}", dataStreamInfo);
- Preconditions.checkNotNull(dataStreamInfo, "data stream info is empty");
- String groupId = dataStreamInfo.getInlongGroupId();
- String streamId = dataStreamInfo.getInlongStreamId();
+ public Integer save(DataStreamInfo streamInfo, String operator) {
+ LOGGER.debug("begin to save data stream info={}", streamInfo);
+ Preconditions.checkNotNull(streamInfo, "data stream info is empty");
+ String groupId = streamInfo.getInlongGroupId();
+ String streamId = streamInfo.getInlongStreamId();
Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
Preconditions.checkNotNull(streamId, BizConstant.STREAM_ID_IS_EMPTY);
@@ -112,8 +111,9 @@ public class DataStreamServiceImpl implements DataStreamService {
throw new BusinessException(BizErrorCodeEnum.DATA_STREAM_ID_DUPLICATE);
}
+ streamInfo.setMqResourceObj(streamId);
// Processing dataStream
- DataStreamEntity streamEntity = CommonBeanUtils.copyProperties(dataStreamInfo, DataStreamEntity::new);
+ DataStreamEntity streamEntity = CommonBeanUtils.copyProperties(streamInfo, DataStreamEntity::new);
Date date = new Date();
streamEntity.setStatus(EntityStatus.DATA_STREAM_NEW.getCode());
streamEntity.setModifier(operator);
@@ -122,9 +122,9 @@ public class DataStreamServiceImpl implements DataStreamService {
streamMapper.insertSelective(streamEntity);
// Processing extended information
- this.saveExt(groupId, streamId, dataStreamInfo.getExtList(), date);
+ this.saveExt(groupId, streamId, streamInfo.getExtList(), date);
// Process data source fields
- this.saveField(groupId, streamId, dataStreamInfo.getFieldList());
+ this.saveField(groupId, streamId, streamInfo.getFieldList());
LOGGER.info("success to save data stream info for groupId={}", groupId);
return streamEntity.getId();
@@ -175,16 +175,6 @@ public class DataStreamServiceImpl implements DataStreamService {
}
@Override
- public List<DataStreamInfoToHiveConfig> queryHiveConfigForAllDataStream(String groupId) {
- return streamMapper.selectStreamToHiveInfo(groupId);
- }
-
- @Override
- public DataStreamInfoToHiveConfig queryHiveConfigForOneDataStream(String groupId, String streamId) {
- return streamMapper.selectStreamToHiveInfoByIdentifier(groupId, streamId);
- }
-
- @Override
public PageInfo<DataStreamListVO> listByCondition(DataStreamPageRequest request) {
LOGGER.debug("begin to list data stream page by {}", request);
@@ -484,11 +474,12 @@ public class DataStreamServiceImpl implements DataStreamService {
// The person in charge of the business has the authority of all data streams
BusinessEntity businessEntity = businessMapper.selectByIdentifier(groupId);
Preconditions.checkNotNull(businessEntity, "business not found by groupId=" + groupId);
+
String inCharges = businessEntity.getInCharges();
+ request.setInCharges(inCharges);
PageHelper.startPage(request.getPageNum(), request.getPageSize());
- Page<DataStreamEntity> page = (Page<DataStreamEntity>) streamMapper.selectByConditionAndInCharges(request,
- inCharges);
+ Page<DataStreamEntity> page = (Page<DataStreamEntity>) streamMapper.selectByCondition(request);
List<DataStreamInfo> streamInfoList = CommonBeanUtils.copyListProperties(page, DataStreamInfo::new);
// Convert and encapsulate the paged results
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
index 0d5a268..1a8fb24 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageHiveOperation.java
@@ -87,11 +87,11 @@ public class StorageHiveOperation extends StorageBaseOperation {
// Set the encoding type and field splitter
DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, entity.getInlongStreamId());
- String encodingType = streamEntity.getDataEncoding() == null
+ String dataEncoding = streamEntity.getDataEncoding() == null
? StandardCharsets.UTF_8.displayName() : streamEntity.getDataEncoding();
- entity.setEncodingType(encodingType);
- if (entity.getFieldSplitter() == null) {
- entity.setFieldSplitter(streamEntity.getFileDelimiter());
+ entity.setDataEncoding(dataEncoding);
+ if (entity.getDataSeparator() == null) {
+ entity.setDataSeparator(streamEntity.getDataSeparator());
}
entity.setStatus(EntityStatus.DATA_STORAGE_NEW.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
index 82868cf..3464294 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageServiceImpl.java
@@ -146,7 +146,7 @@ public class StorageServiceImpl extends StorageBaseOperation implements StorageS
// Query HDFS, HIVE, ES storage information and encapsulate it in the result set
List<StorageSummaryInfo> totalList = new ArrayList<>();
- List<StorageSummaryInfo> hiveSummaryList = hiveStorageMapper.selectSummaryByIdentifier(groupId, streamId);
+ List<StorageSummaryInfo> hiveSummaryList = hiveStorageMapper.selectSummary(groupId, streamId);
totalList.addAll(hiveSummaryList);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
index f9d5272..dbaa75e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/WorkflowApproverServiceImpl.java
@@ -125,8 +125,8 @@ public class WorkflowApproverServiceImpl implements WorkflowApproverService {
@Override
public void update(WorkflowApprover config, String operator) {
- Preconditions.checkNotNull(config, "config can't be null");
- Preconditions.checkNotNull(config.getId(), "id can't be null");
+ Preconditions.checkNotNull(config, "config cannot be null");
+ Preconditions.checkNotNull(config.getId(), "id cannot be null");
WorkflowApproverEntity entity = workflowApproverMapper.selectByPrimaryKey(config.getId());
Preconditions.checkNotNull(entity, "not exist with id:" + config.getId());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
index 8b6a753..8b9d6eb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.thirdpart.hive;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
-import org.apache.inlong.manager.service.core.DataStreamService;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -37,8 +37,7 @@ import org.springframework.stereotype.Service;
public class CreateHiveTableForAllStreamListener implements TaskEventListener {
@Autowired
- private DataStreamService dataStreamService;
-
+ private StorageHiveEntityMapper hiveEntityMapper;
@Autowired
private HiveTableOperator hiveTableOperator;
@@ -53,12 +52,12 @@ public class CreateHiveTableForAllStreamListener implements TaskEventListener {
String groupId = form.getInlongGroupId();
log.info("begin to create hive table for groupId={}", groupId);
- List<DataStreamInfoToHiveConfig> configList = dataStreamService.queryHiveConfigForAllDataStream(groupId);
+ List<StorageHiveSortInfo> configList = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, null);
if (configList == null || configList.size() == 0) {
return ListenerResult.success();
}
- for (DataStreamInfoToHiveConfig hiveConfig : configList) {
+ for (StorageHiveSortInfo hiveConfig : configList) {
hiveTableOperator.createHiveTable(groupId, hiveConfig);
log.info("finish to create hive table for business {}", groupId);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
index ec3dcd1..58ee3ac 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
@@ -17,9 +17,10 @@
package org.apache.inlong.manager.service.thirdpart.hive;
+import java.util.List;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
-import org.apache.inlong.manager.service.core.DataStreamService;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
+import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -36,8 +37,7 @@ import org.springframework.stereotype.Service;
public class CreateHiveTableForOneStreamListener implements TaskEventListener {
@Autowired
- private DataStreamService dataStreamService;
-
+ private StorageHiveEntityMapper hiveEntityMapper;
@Autowired
private HiveTableOperator hiveTableOperator;
@@ -53,11 +53,13 @@ public class CreateHiveTableForOneStreamListener implements TaskEventListener {
String streamId = form.getInlongStreamId();
log.info("begin create hive table for groupId={}, streamId={}", groupId, streamId);
- DataStreamInfoToHiveConfig hiveConfig = dataStreamService.queryHiveConfigForOneDataStream(groupId, streamId);
- if (hiveConfig == null) {
+ List<StorageHiveSortInfo> hiveConfig = hiveEntityMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
+ if (hiveConfig == null || hiveConfig.size() == 0) {
return ListenerResult.success();
}
- hiveTableOperator.createHiveTable(groupId, hiveConfig);
+ for (StorageHiveSortInfo info : hiveConfig) {
+ hiveTableOperator.createHiveTable(groupId, info);
+ }
log.info("finish create hive table for business {} - {} ", groupId, streamId);
return ListenerResult.success();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
index eb0d066..aba950f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/HiveTableOperator.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfoToHiveConfig;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
import org.apache.inlong.manager.common.pojo.query.ColumnInfoBean;
import org.apache.inlong.manager.common.pojo.query.DatabaseQueryBean;
import org.apache.inlong.manager.common.pojo.query.hive.HiveColumnQueryBean;
@@ -54,7 +54,7 @@ public class HiveTableOperator {
/**
* Create hive table according to the groupId and hive config
*/
- public void createHiveTable(String groupId, DataStreamInfoToHiveConfig hiveConfig) {
+ public void createHiveTable(String groupId, StorageHiveSortInfo hiveConfig) {
if (log.isDebugEnabled()) {
log.debug("begin create hive table for business={}, hiveConfig={}", groupId, hiveConfig);
}
@@ -90,7 +90,7 @@ public class HiveTableOperator {
log.info("finish create hive table for business {} ", groupId);
}
- protected HiveTableQueryBean getTableQueryBean(DataStreamInfoToHiveConfig hiveConfig) {
+ protected HiveTableQueryBean getTableQueryBean(StorageHiveSortInfo hiveConfig) {
String groupId = hiveConfig.getInlongGroupId();
String streamId = hiveConfig.getInlongStreamId();
log.info("begin to get table query bean for groupId={}, streamId={}", groupId, streamId);
@@ -119,11 +119,11 @@ public class HiveTableOperator {
HiveTableQueryBean queryBean = new HiveTableQueryBean();
queryBean.setColumns(columnQueryBeans);
// set terminated symbol
- if (hiveConfig.getFieldSplitter() != null) {
- char ch = (char) Integer.parseInt(hiveConfig.getFieldSplitter());
+ if (hiveConfig.getTargetSeparator() != null) {
+ char ch = (char) Integer.parseInt(hiveConfig.getTargetSeparator());
queryBean.setFieldTerSymbol(String.valueOf(ch));
}
- queryBean.setUsername(hiveConfig.getUserName());
+ queryBean.setUsername(hiveConfig.getUsername());
queryBean.setPassword(hiveConfig.getPassword());
queryBean.setTableName(hiveConfig.getTableName());
queryBean.setDbName(hiveConfig.getDbName());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index d27ef01..9f037ec 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -17,20 +17,26 @@
package org.apache.inlong.manager.service.thirdpart.sort;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveInfo;
-import org.apache.inlong.manager.common.pojo.datastream.DataStreamInfo;
+import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
import org.apache.inlong.manager.common.util.JsonUtils;
-import org.apache.inlong.manager.dao.entity.StorageHiveEntity;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
+import org.apache.inlong.manager.dao.entity.StorageHiveFieldEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
+import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.core.StorageService;
import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
@@ -42,8 +48,13 @@ import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
import org.apache.inlong.sort.protocol.DataFlowInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
import org.apache.inlong.sort.protocol.deserialization.TDMsgCsvDeserializationInfo;
import org.apache.inlong.sort.protocol.sink.HiveSinkInfo;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveFileFormat;
+import org.apache.inlong.sort.protocol.sink.HiveSinkInfo.HiveTimePartitionInfo;
+import org.apache.inlong.sort.protocol.sink.SinkInfo;
+import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
import org.springframework.beans.factory.annotation.Autowired;
@@ -53,14 +64,30 @@ import org.springframework.stereotype.Component;
@Component
public class PushHiveConfigTaskListener implements TaskEventListener {
+ private static final Map<String, String> PARTITION_TIME_FORMAT_MAP = new HashMap<>();
+
+ private static final Map<String, TimeUnit> PARTITION_TIME_UNIT_MAP = new HashMap<>();
+
+ static {
+ PARTITION_TIME_FORMAT_MAP.put("D", "yyyyMMdd");
+ PARTITION_TIME_FORMAT_MAP.put("H", "yyyyMMddHH");
+ PARTITION_TIME_FORMAT_MAP.put("I", "yyyyMMddHHmm");
+
+ PARTITION_TIME_UNIT_MAP.put("D", TimeUnit.DAYS);
+ PARTITION_TIME_UNIT_MAP.put("H", TimeUnit.HOURS);
+ PARTITION_TIME_UNIT_MAP.put("I", TimeUnit.MINUTES);
+ }
+
+ @Autowired
+ private ClusterBean clusterBean;
@Autowired
- private StorageService storageService;
+ private BusinessEntityMapper businessMapper;
@Autowired
private StorageHiveEntityMapper storageHiveMapper;
@Autowired
- private DataStreamService dataStreamService;
+ private StorageHiveFieldEntityMapper hiveFieldMapper;
@Autowired
- private ClusterBean clusterBean;
+ private DataStreamService dataStreamService;
@Override
public TaskEvent event() {
@@ -76,19 +103,24 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
BusinessInfo businessInfo = form.getBusinessInfo();
String groupId = businessInfo.getInlongGroupId();
+
+ BusinessEntity business = businessMapper.selectByIdentifier(groupId);
+ if (business == null || EntityStatus.IS_DELETED.getCode().equals(business.getIsDeleted())) {
+ log.warn("skip to push sort hive config for groupId={}, as biz not exists or has been deleted", groupId);
+ return ListenerResult.success();
+ }
+
// if streamId not null, just push the config belongs to the groupId and the streamId
String streamId = form.getInlongStreamId();
+ List<StorageHiveSortInfo> hiveInfoList = storageHiveMapper.selectHiveSortInfoByIdentifier(groupId, streamId);
+ for (StorageHiveSortInfo hiveInfo : hiveInfoList) {
+ Integer storageId = hiveInfo.getId();
- List<StorageHiveEntity> storageHiveEntities = storageHiveMapper.selectByIdentifier(groupId, streamId);
- for (StorageHiveEntity hiveEntity : storageHiveEntities) {
- Integer storageId = hiveEntity.getId();
- StorageHiveInfo hiveStorage = (StorageHiveInfo) storageService
- .getById(BizConstant.STORAGE_HIVE, storageId);
if (log.isDebugEnabled()) {
- log.debug("hive storage info: {}", hiveStorage);
+ log.debug("hive storage info: {}", hiveInfo);
}
- DataFlowInfo dataFlowInfo = getDataFlowInfo(businessInfo, hiveStorage);
+ DataFlowInfo dataFlowInfo = getDataFlowInfo(business, hiveInfo);
if (log.isDebugEnabled()) {
log.debug("try to push hive config to sort: {}", JsonUtils.toJson(dataFlowInfo));
}
@@ -109,26 +141,62 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
return ListenerResult.success();
}
- private DataFlowInfo getDataFlowInfo(BusinessInfo businessInfo, StorageHiveInfo hiveStorage) {
- Stream<FieldInfo> hiveFields = hiveStorage.getHiveFieldList().stream().map(field -> {
- FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
- return new FieldInfo(field.getFieldName(), formatInfo);
- });
+ private DataFlowInfo getDataFlowInfo(BusinessEntity businessEntity, StorageHiveSortInfo hiveInfo) {
+ String groupId = hiveInfo.getInlongGroupId();
+ String streamId = hiveInfo.getInlongStreamId();
+ List<StorageHiveFieldEntity> fieldList = hiveFieldMapper.selectHiveFields(groupId, streamId);
+
+ if (fieldList == null || fieldList.size() == 0) {
+ throw new WorkflowListenerException("no hive fields for groupId=" + groupId + ", streamId=" + streamId);
+ }
+
+ SourceInfo sourceInfo = getSourceInfo(businessEntity, hiveInfo, fieldList);
+ SinkInfo sinkInfo = getSinkInfo(hiveInfo, fieldList);
+
+ // push information
+ return new DataFlowInfo(hiveInfo.getId(), sourceInfo, sinkInfo);
+ }
+
+ private HiveSinkInfo getSinkInfo(StorageHiveSortInfo hiveInfo, List<StorageHiveFieldEntity> fieldList) {
+ if (hiveInfo.getJdbcUrl() == null) {
+ throw new WorkflowListenerException("hive server url cannot be empty");
+ }
+
+ // Use the field separator in Hive, the default is TextFile
+ Character separator = (char) Integer.parseInt(hiveInfo.getTargetSeparator());
+ HiveFileFormat fileFormat;
+ String format = hiveInfo.getFileFormat();
- List<FieldInfo> sinkFields = hiveFields.collect(Collectors.toList());
- FieldInfo partitionFieldInfo = new FieldInfo(hiveStorage.getPrimaryPartition(),
- new TimestampFormatInfo("MILLIS"));
- sinkFields.add(partitionFieldInfo);
+ if (BizConstant.FILE_FORMAT_ORC.equalsIgnoreCase(format)) {
+ fileFormat = new HiveSinkInfo.OrcFileFormat(1000);
+ } else if (BizConstant.FILE_FORMAT_SEQUENCE.equalsIgnoreCase(format)) {
+ fileFormat = new HiveSinkInfo.SequenceFileFormat(separator, 100);
+ } else if (BizConstant.FILE_FORMAT_PARQUET.equalsIgnoreCase(format)) {
+ fileFormat = new HiveSinkInfo.ParquetFileFormat();
+ } else {
+ fileFormat = new HiveSinkInfo.TextFileFormat(separator);
+ }
- String hiveServerUrl = hiveStorage.getJdbcUrl();
- if (hiveServerUrl != null && !hiveServerUrl.startsWith("jdbc:hive2://")) {
- hiveServerUrl = "jdbc:hive2://" + hiveServerUrl;
+ // The primary partition field, in Sink must be HiveTimePartitionInfo
+ List<HiveSinkInfo.HivePartitionInfo> partitionList = new ArrayList<>();
+ String primary = hiveInfo.getPrimaryPartition();
+ if (StringUtils.isNotEmpty(primary)) {
+ // Hive partitions are by day, hour, and minute
+ String unit = hiveInfo.getPartitionUnit();
+ HiveTimePartitionInfo timePartitionInfo = new HiveTimePartitionInfo(
+ primary, PARTITION_TIME_FORMAT_MAP.get(unit));
+ partitionList.add(timePartitionInfo);
+ }
+ // For the secondary partition field, the sink is temporarily encapsulated as HiveFieldPartitionInfo,
+ // TODO the type be set according to the type of the field itself.
+ if (StringUtils.isNotEmpty(hiveInfo.getSecondaryPartition())) {
+ partitionList.add(new HiveSinkInfo.HiveFieldPartitionInfo(hiveInfo.getSecondaryPartition()));
}
// dataPath = hdfsUrl + / + warehouseDir + / + dbName + .db/ + tableName
StringBuilder dataPathBuilder = new StringBuilder();
- String hdfsUrl = hiveStorage.getHdfsDefaultFs();
- String warehouseDir = hiveStorage.getWarehouseDir();
+ String hdfsUrl = hiveInfo.getHdfsDefaultFs();
+ String warehouseDir = hiveInfo.getWarehouseDir();
if (hdfsUrl.endsWith("/")) {
dataPathBuilder.append(hdfsUrl, 0, hdfsUrl.length() - 1);
} else {
@@ -139,57 +207,116 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
} else {
dataPathBuilder.append(warehouseDir);
}
- String dataPath = dataPathBuilder.append("/")
- .append(hiveStorage.getDbName())
- .append(".db/")
- .append(hiveStorage.getTableName())
- .toString();
-
- // Encapsulate the deserialization information in the source
- HiveSinkInfo.HiveFileFormat fileFormat = new HiveSinkInfo.TextFileFormat(',');
- if (hiveStorage.getFieldSplitter() != null) {
- char c = (char) Integer.parseInt(hiveStorage.getFieldSplitter());
- fileFormat = new HiveSinkInfo.TextFileFormat(c);
+ String dataPath = dataPathBuilder.append("/").append(hiveInfo.getDbName())
+ .append(".db/").append(hiveInfo.getTableName()).toString();
+
+ // Get the sink field, if there is no partition field in the source field, add the partition field to the end
+ List<FieldInfo> fieldInfoList = getSinkFields(fieldList, hiveInfo.getPrimaryPartition());
+
+ return new HiveSinkInfo(fieldInfoList.toArray(new FieldInfo[0]), hiveInfo.getJdbcUrl(),
+ hiveInfo.getDbName(), hiveInfo.getTableName(), hiveInfo.getUsername(), hiveInfo.getPassword(),
+ dataPath, partitionList.toArray(new HiveSinkInfo.HivePartitionInfo[0]), fileFormat);
+ }
+
+ /**
+ * Get source info
+ */
+ private SourceInfo getSourceInfo(BusinessEntity businessEntity, StorageHiveSortInfo info,
+ List<StorageHiveFieldEntity> fieldList) {
+ DeserializationInfo deserializationInfo = null;
+ boolean isDbType = BizConstant.DATA_SOURCE_DB.equals(info.getDataSourceType());
+ if (!isDbType) {
+ // FILE and auto push source, the data format is TEXT or KEY-VALUE, temporarily use TDMsgCsv
+ String dataType = info.getDataType();
+ if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataType)
+ || BizConstant.DATA_TYPE_KEY_VALUE.equalsIgnoreCase(dataType)) {
+ // Use the field separator from the data stream
+ char separator = (char) Integer.parseInt(info.getSourceSeparator());
+ // TODO support escape
+ /*Character escape = null;
+ if (info.getDataEscapeChar() != null) {
+ escape = info.getDataEscapeChar().charAt(0);
+ }*/
+ // Whether to delete the first separator, the default is false for the time being
+ deserializationInfo = new TDMsgCsvDeserializationInfo(info.getInlongStreamId(), separator);
+ }
+ }
+
+ // The number and order of the source fields must be the same as the target fields
+ SourceInfo sourceInfo = null;
+ // Get the source field, if there is no partition field in source, add the partition field to the end
+ List<FieldInfo> sourceFields = getSourceFields(fieldList, info.getPrimaryPartition());
+
+ String middleWare = businessEntity.getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middleWare)) {
+ String masterAddress = clusterBean.getTubeMaster();
+ Preconditions.checkNotNull(masterAddress, "tube cluster address cannot be empty");
+ String topic = businessEntity.getMqResourceObj();
+ // The consumer group name is: taskName_topicName_consumer_group
+ String consumerGroup = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+ sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
+ deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
+ } else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middleWare)) {
+ String tenant = clusterBean.getDefaultTenant();
+ String namespace = businessEntity.getMqResourceObj();
+ String pulsarTopic = info.getMqResourceObj();
+ // Full name of Topic in Pulsar
+ String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
+ String adminUrl = clusterBean.getPulsarServiceUrl();
+ String serviceUrl = clusterBean.getPulsarServiceUrl();
+ String consumerGroup = clusterBean.getAppName() + "_" + pulsarTopic + "_consumer_group";
+ sourceInfo = new PulsarSourceInfo(adminUrl, serviceUrl, fullTopicName, consumerGroup,
+ deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
}
- // encapsulate hive sink
- HiveSinkInfo hiveSinkInfo = new HiveSinkInfo(
- sinkFields.toArray(new FieldInfo[0]),
- hiveServerUrl,
- hiveStorage.getDbName(),
- hiveStorage.getTableName(),
- hiveStorage.getUsername(),
- hiveStorage.getPassword(),
- dataPath,
- Stream.of(new HiveSinkInfo.HiveTimePartitionInfo(hiveStorage.getPrimaryPartition(),
- "yyyyMMddHH")).toArray(HiveSinkInfo.HivePartitionInfo[]::new),
- fileFormat
- );
-
- // data stream fields
- DataStreamInfo dataStream = dataStreamService.get(hiveStorage.getInlongGroupId(),
- hiveStorage.getInlongStreamId());
- Stream<FieldInfo> streamFields = dataStream.getFieldList().stream().map(field -> {
+ return sourceInfo;
+ }
+
+ /**
+ * Get sink fields
+ */
+ private List<FieldInfo> getSinkFields(List<StorageHiveFieldEntity> fieldList, String partitionField) {
+ boolean duplicate = false;
+ List<FieldInfo> fieldInfoList = new ArrayList<>();
+ for (StorageHiveFieldEntity field : fieldList) {
+ String fieldName = field.getFieldName();
+ if (fieldName.equals(partitionField)) {
+ duplicate = true;
+ }
+
FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getFieldType().toLowerCase());
- return new FieldInfo(field.getFieldName(), formatInfo);
- });
-
- String topic = businessInfo.getMqResourceObj();
- String consumeGroupName = "sort_" + businessInfo.getMqResourceObj() + "_consumer_group";
- TDMsgCsvDeserializationInfo deserializationInfo = null;
- if (BizConstant.DATA_TYPE_TEXT.equalsIgnoreCase(dataStream.getDataType())) {
- char c = (char) Integer.parseInt(dataStream.getFileDelimiter());
- deserializationInfo = new TDMsgCsvDeserializationInfo(hiveStorage.getInlongStreamId(), c);
+ FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
+ fieldInfoList.add(fieldInfo);
}
- SourceInfo sourceInfo = new TubeSourceInfo(topic, clusterBean.getTubeMaster(), consumeGroupName,
- deserializationInfo, streamFields.toArray(FieldInfo[]::new));
- // push information
- return new DataFlowInfo(hiveStorage.getId(), sourceInfo, hiveSinkInfo);
+ // There is no partition field in the ordinary field, you need to add the partition field to the end
+ if (!duplicate && StringUtils.isNotEmpty(partitionField)) {
+ FieldInfo fieldInfo = new FieldInfo(partitionField, new TimestampFormatInfo("MILLIS"));
+ fieldInfoList.add(0, fieldInfo);
+ }
+ return fieldInfoList;
+ }
+
+ /**
+ * Get source field list
+ * TODO support BuiltInField
+ */
+ private List<FieldInfo> getSourceFields(List<StorageHiveFieldEntity> fieldList, String partitionField) {
+ List<FieldInfo> fieldInfoList = new ArrayList<>();
+ for (StorageHiveFieldEntity field : fieldList) {
+ FormatInfo formatInfo = SortFieldFormatUtils.convertFieldFormat(field.getSourceFieldType().toLowerCase());
+ String fieldName = field.getSourceFieldName();
+
+ FieldInfo fieldInfo = new FieldInfo(fieldName, formatInfo);
+ fieldInfoList.add(fieldInfo);
+ }
+
+ return fieldInfoList;
}
@Override
public boolean async() {
return false;
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java
index 1df0d1c..ff62e36 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/SortFieldFormatUtils.java
@@ -17,8 +17,11 @@
package org.apache.inlong.manager.service.thirdpart.sort;
+import org.apache.inlong.sort.formats.common.ArrayFormatInfo;
import org.apache.inlong.sort.formats.common.BooleanFormatInfo;
import org.apache.inlong.sort.formats.common.ByteFormatInfo;
+import org.apache.inlong.sort.formats.common.ByteTypeInfo;
+import org.apache.inlong.sort.formats.common.DateFormatInfo;
import org.apache.inlong.sort.formats.common.DecimalFormatInfo;
import org.apache.inlong.sort.formats.common.DoubleFormatInfo;
import org.apache.inlong.sort.formats.common.FloatFormatInfo;
@@ -27,6 +30,8 @@ import org.apache.inlong.sort.formats.common.IntFormatInfo;
import org.apache.inlong.sort.formats.common.LongFormatInfo;
import org.apache.inlong.sort.formats.common.ShortFormatInfo;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
+import org.apache.inlong.sort.formats.common.TimeFormatInfo;
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
/**
* Sort field formatting tool
@@ -42,21 +47,21 @@ public class SortFieldFormatUtils {
public static FormatInfo convertFieldFormat(String type) {
FormatInfo formatInfo;
switch (type) {
- case "string":
- formatInfo = new StringFormatInfo();
- break;
case "boolean":
formatInfo = new BooleanFormatInfo();
break;
+ case "tinyint":
case "byte":
formatInfo = new ByteFormatInfo();
break;
+ case "smallint":
case "short":
formatInfo = new ShortFormatInfo();
break;
case "int":
formatInfo = new IntFormatInfo();
break;
+ case "bigint":
case "long":
formatInfo = new LongFormatInfo();
break;
@@ -69,7 +74,20 @@ public class SortFieldFormatUtils {
case "decimal":
formatInfo = new DecimalFormatInfo();
break;
- default:
+ case "date":
+ formatInfo = new DateFormatInfo();
+ break;
+ case "time":
+ formatInfo = new TimeFormatInfo();
+ break;
+ case "timestamp":
+ formatInfo = new TimestampFormatInfo();
+ break;
+ case "binary":
+ case "fixed":
+ formatInfo = new ArrayFormatInfo(ByteTypeInfo::new);
+ break;
+ default: // default is string
formatInfo = new StringFormatInfo();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
index 389c4e3..4ea6073 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
@@ -94,9 +94,9 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
private NewConsumptionApproveForm getAdminApproveForm(WorkflowContext context) {
TaskInstance adminTask = queryService.listTask(TaskQuery.builder()
- .processInstId(context.getProcessInstance().getId())
- .name(NewConsumptionWorkflowDefinition.UT_ADMINT_NAME)
- .build())
+ .processInstId(context.getProcessInstance().getId())
+ .name(NewConsumptionWorkflowDefinition.UT_ADMINT_NAME)
+ .build())
.stream()
.findFirst()
.orElseThrow(() -> new BusinessException(BizErrorCodeEnum.WORKFLOW_EXE_FAILED,
@@ -104,7 +104,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
Process process = context.getProcess();
NewConsumptionApproveForm form = WorkflowFormParserUtils.parseTaskForm(adminTask, process);
- Preconditions.checkNotNull(form, "form can't be null");
+ Preconditions.checkNotNull(form, "form cannot be null");
return form;
}
diff --git a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
index d3ae3f1..08b2320 100644
--- a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
@@ -85,7 +85,7 @@ CREATE TABLE `business`
`schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'Business status',
+ `status` int(4) DEFAULT '21' COMMENT 'Business status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -150,7 +150,7 @@ CREATE TABLE `cluster_info`
`url` varchar(256) DEFAULT NULL COMMENT 'Cluster URL address',
`is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
`ext_props` text DEFAULT NULL COMMENT 'extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -183,7 +183,7 @@ CREATE TABLE `common_db_server`
`db_description` varchar(256) DEFAULT NULL COMMENT 'DB description',
`backup_db_server_ip` varchar(64) DEFAULT NULL COMMENT 'Backup DB HOST',
`backup_db_port` int(11) DEFAULT NULL COMMENT 'Backup DB port',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -208,7 +208,7 @@ CREATE TABLE `common_file_server`
`issue_type` varchar(128) DEFAULT NULL COMMENT 'Issuance method, such as SSH, TCS, etc.',
`username` varchar(64) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
- `status` int(11) DEFAULT '0' COMMENT 'status',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -234,12 +234,12 @@ CREATE TABLE `consumption`
`topic` varchar(255) NOT NULL COMMENT 'Consumption topic',
`filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
`inlong_stream_id` varchar(1024) DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
- `status` int(11) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `status` int(4) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'creator',
`modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Modify time',
- `is_deleted` int(2) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
PRIMARY KEY (`id`)
);
@@ -249,8 +249,8 @@ CREATE TABLE `consumption`
DROP TABLE IF EXISTS `consumption_pulsar`;
CREATE TABLE `consumption_pulsar`
(
- `id` int NOT NULL AUTO_INCREMENT,
- `consumption_id` int DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+ `id` int(11) NOT NULL AUTO_INCREMENT,
+ `consumption_id` int(11) DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
`consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
`consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
`inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group ID',
@@ -258,7 +258,7 @@ CREATE TABLE `consumption_pulsar`
`retry_letter_topic` varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
`is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
`dead_letter_topic` varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
- `is_deleted` int DEFAULT '0' COMMENT 'Whether to delete',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete',
PRIMARY KEY (`id`)
) COMMENT ='Pulsar consumption table';
@@ -278,7 +278,7 @@ CREATE TABLE `data_proxy_cluster`
`net_type` varchar(20) DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
`ext_props` text DEFAULT NULL COMMENT 'Extended properties',
- `status` int(11) DEFAULT '1' COMMENT 'Cluster status',
+ `status` int(4) DEFAULT '1' COMMENT 'Cluster status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -321,7 +321,8 @@ CREATE TABLE `data_source_cmd_config`
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Last update time ',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`result_info` varchar(64) DEFAULT NULL,
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_1` (`task_id`, `bSend`, `specified_data_time`)
);
-- ----------------------------
@@ -340,18 +341,24 @@ CREATE TABLE `data_stream`
`storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
`data_type` varchar(20) DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
`data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
- `file_delimiter` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
`have_predefined_fields` tinyint(1) DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
`in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(11) DEFAULT '0' COMMENT 'Data stream status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data stream status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
`temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_data_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`, `modify_time`)
);
-- ----------------------------
@@ -367,7 +374,8 @@ CREATE TABLE `data_stream_ext`
`key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_stream_id` (`inlong_stream_id`)
);
-- ----------------------------
@@ -391,7 +399,8 @@ CREATE TABLE `data_stream_field`
`bon_field_path` varchar(256) DEFAULT NULL COMMENT 'BON field path',
`bon_field_type` varchar(64) DEFAULT NULL COMMENT 'BON field type',
`encrypt_level` varchar(20) DEFAULT NULL COMMENT 'Encryption level',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_stream_id` (`inlong_stream_id`)
);
-- ----------------------------
@@ -412,7 +421,7 @@ CREATE TABLE `operation_log`
`cost_time` bigint(20) DEFAULT NULL COMMENT 'time-consuming',
`body` text COMMENT 'Request body',
`param` text COMMENT 'parameter',
- `status` tinyint(1) DEFAULT NULL COMMENT 'status',
+ `status` int(4) DEFAULT NULL COMMENT 'status',
`request_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'request time',
`err_msg` text COMMENT 'Error message',
PRIMARY KEY (`id`)
@@ -432,7 +441,9 @@ CREATE TABLE `role`
`create_by` varchar(255) NOT NULL,
`update_by` varchar(255) NOT NULL,
`disabled` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it disabled?',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `role_role_code_uindex` (`role_code`),
+ UNIQUE KEY `role_role_name_uindex` (`role_name`)
);
-- ----------------------------
@@ -450,7 +461,7 @@ CREATE TABLE `source_db_basic`
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
PRIMARY KEY (`id`)
);
@@ -471,8 +482,8 @@ CREATE TABLE `source_db_detail`
`table_fields` longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
`data_sql` longtext COMMENT 'SQL statement to collect source data, required for full amount',
`crontab` varchar(56) DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -525,8 +536,8 @@ CREATE TABLE `source_file_detail`
`username` varchar(32) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
`file_path` varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
- `status` int(11) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
@@ -548,44 +559,45 @@ CREATE TABLE `storage_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_storage_id` (`storage_id`)
);
-- ----------------------------
-- Table structure for storage_hive
-- ----------------------------
DROP TABLE IF EXISTS `storage_hive`;
-DROP TABLE IF EXISTS `storage_hive`;
CREATE TABLE `storage_hive`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
- `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
- `jdbc_url` varchar(255) NOT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
- `username` varchar(128) NOT NULL COMMENT 'Username',
- `password` varchar(255) NOT NULL COMMENT 'User password',
- `db_name` varchar(128) NOT NULL COMMENT 'Target database name',
- `table_name` varchar(128) NOT NULL COMMENT 'Target data table name',
- `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
- `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
- `partition_type` varchar(10) DEFAULT NULL COMMENT 'The partition type, there are: H-by hour, D-by day, W-by week, M-by month, O-one-time, R-non-periodical',
- `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
- `encoding_type` varchar(255) DEFAULT NULL COMMENT 'data encoding',
- `field_splitter` varchar(10) DEFAULT NULL COMMENT 'field separator',
- `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
- `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
- `usage_interval` varchar(10) DEFAULT NULL COMMENT 'The amount of time that Sort collected data will land on Hive, there are 10M, 15M, 30M, 1H, 1D',
- `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
- `status` int(11) DEFAULT '0' COMMENT 'status',
- `previous_status` int(11) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
- `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
+ `inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
+ `jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+ `username` varchar(128) DEFAULT NULL COMMENT 'Username',
+ `password` varchar(255) DEFAULT NULL COMMENT 'User password',
+ `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
+ `table_name` varchar(128) DEFAULT NULL COMMENT 'Target data table name',
+ `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+ `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+ `partition_interval` int(5) DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+ `partition_unit` varchar(10) DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+ `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
+ `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
+ `partition_creation_strategy` varchar(50) DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+ `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+ `data_encoding` varchar(20) DEFAULT 'UTF-8' COMMENT 'data encoding type',
+ `data_separator` varchar(10) DEFAULT NULL COMMENT 'data field separator',
+ `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
+ `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
+ `temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
PRIMARY KEY (`id`)
);
@@ -609,7 +621,8 @@ CREATE TABLE `storage_hive_field`
`is_exist` tinyint(1) DEFAULT '0' COMMENT 'Does it exist, 0: does not exist, 1: exists',
`rank_num` smallint(6) DEFAULT '0' COMMENT 'Field order (front-end display field order)',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `index_storage_id` (`storage_id`)
);
-- ----------------------------
@@ -699,7 +712,8 @@ CREATE TABLE `user`
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time',
`create_by` varchar(255) NOT NULL COMMENT 'create by sb.',
`update_by` varchar(255) DEFAULT NULL COMMENT 'update by sb.',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `user_name_uindex` (`name`)
);
-- create default admin user, username is 'admin', password is 'inlong'
@@ -742,7 +756,8 @@ CREATE TABLE `wf_approver`
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'update time',
`is_deleted` int(11) DEFAULT '0' COMMENT 'Whether to delete, 0 is not deleted, if greater than 0, delete',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ KEY `process_name_task_name_index` (`process_name`, `task_name`)
);
-- create default approver for new consumption and new business
@@ -797,7 +812,7 @@ CREATE TABLE `wf_process_instance`
`form_data` mediumtext COMMENT 'form information',
`start_time` datetime NOT NULL COMMENT 'start time',
`end_time` datetime DEFAULT NULL COMMENT 'End event',
- `ext` text COMMENT 'Extended information-text',
+ `ext` text COMMENT 'Extended information-json',
`hidden` tinyint(1) NOT NULL DEFAULT '0' COMMENT 'Is it hidden',
PRIMARY KEY (`id`)
);
@@ -823,7 +838,7 @@ CREATE TABLE `wf_task_instance`
`form_data` mediumtext COMMENT 'form information submitted by the current task',
`start_time` datetime NOT NULL COMMENT 'start time',
`end_time` datetime DEFAULT NULL COMMENT 'End time',
- `ext` text COMMENT 'Extended information-text',
+ `ext` text COMMENT 'Extended information-json',
PRIMARY KEY (`id`)
);
@@ -840,13 +855,14 @@ CREATE TABLE `cluster_set`
`middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
`in_charges` varchar(512) COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) COMMENT 'List of names of business followers, separated by commas',
- `status` int(11) DEFAULT '21' COMMENT 'ClusterSet status',
+ `status` int(4) DEFAULT '21' COMMENT 'ClusterSet status',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) NULL COMMENT 'Modifier name',
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
`modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cluster_set` (`set_name`)
);
-- ----------------------------
@@ -858,7 +874,8 @@ CREATE TABLE `cluster_set_inlongid`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cluster_set_inlongid` (`set_name`, `inlong_group_id`)
);
-- ----------------------------
@@ -871,7 +888,8 @@ CREATE TABLE `cache_cluster`
`cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cache_cluster` (`cluster_name`)
);
-- ----------------------------
@@ -885,8 +903,9 @@ CREATE TABLE `cache_cluster_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_cache_cluster` (`cluster_name`)
);
-- ----------------------------
@@ -899,7 +918,8 @@ CREATE TABLE `cache_topic`
`topic_name` varchar(128) NOT NULL COMMENT 'Topic name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`partition_num` int(11) NOT NULL COMMENT 'Partition number',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_cache_topic` (`topic_name`, `set_name`)
);
-- ----------------------------
@@ -912,7 +932,8 @@ CREATE TABLE `proxy_cluster`
`cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`zone` varchar(128) NOT NULL COMMENT 'Zone, sz/sh/tj',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_proxy_cluster` (`cluster_name`, `set_name`)
);
-- ----------------------------
@@ -924,7 +945,8 @@ CREATE TABLE `proxy_cluster_to_cache_cluster`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`proxy_cluster_name` varchar(128) NOT NULL COMMENT 'ProxyCluster name, English, numbers and underscore',
`cache_cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_proxy_cluster_to_cache_cluster` (`proxy_cluster_name`, `cache_cluster_name`)
);
-- ----------------------------
@@ -939,7 +961,8 @@ CREATE TABLE `flume_source`
`type` varchar(128) NOT NULL COMMENT 'FlumeSource classname',
`channels` varchar(128) NOT NULL COMMENT 'The channels of FlumeSource, separated by space',
`selector_type` varchar(128) NOT NULL COMMENT 'FlumeSource channel selector classname',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_flume_source` (`source_name`, `set_name`)
);
-- ----------------------------
@@ -954,8 +977,9 @@ CREATE TABLE `flume_source_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_flume_source_ext` (`parent_name`)
);
-- ----------------------------
@@ -968,7 +992,8 @@ CREATE TABLE `flume_channel`
`channel_name` varchar(128) NOT NULL COMMENT 'FlumeChannel name, English, numbers and underscore',
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`type` varchar(128) NOT NULL COMMENT 'FlumeChannel classname',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_flume_channel` (`channel_name`, `set_name`)
);
-- ----------------------------
@@ -983,8 +1008,9 @@ CREATE TABLE `flume_channel_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_flume_channel_ext` (`parent_name`)
);
-- ----------------------------
@@ -998,7 +1024,8 @@ CREATE TABLE `flume_sink`
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`type` varchar(128) NOT NULL COMMENT 'FlumeSink classname',
`channel` varchar(128) NOT NULL COMMENT 'FlumeSink channel',
- PRIMARY KEY (`id`)
+ PRIMARY KEY (`id`),
+ UNIQUE KEY `unique_flume_sink` (`sink_name`, `set_name`)
);
-- ----------------------------
@@ -1013,8 +1040,9 @@ CREATE TABLE `flume_sink_ext`
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
`is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'modify time',
- PRIMARY KEY (`id`)
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ PRIMARY KEY (`id`),
+ KEY `index_flume_sink_ext` (`parent_name`)
);
SET FOREIGN_KEY_CHECKS = 1;
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java
index 85fc1e1..e8bd577 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/StartEventProcessor.java
@@ -58,7 +58,7 @@ public class StartEventProcessor extends AbstractNextableElementProcessor<StartE
Process process = context.getProcess();
ProcessForm form = context.getProcessForm();
if (process.getFormClass() != null) {
- Preconditions.checkNotNull(form, "form can't be null");
+ Preconditions.checkNotNull(form, "form cannot be null");
Preconditions.checkTrue(form.getClass().isAssignableFrom(process.getFormClass()),
() -> "form type not match, should be class " + process.getFormClass());
form.validate();
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
index 765eb54..92798cf 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/core/processor/UserTaskProcessor.java
@@ -69,7 +69,7 @@ public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
@Override
public void create(UserTask userTask, WorkflowContext context) {
List<String> approvers = userTask.getApproverAssign().assign(context);
- Preconditions.checkNotEmpty(approvers, "can't assign approvers for task: " + userTask.getDisplayName()
+ Preconditions.checkNotEmpty(approvers, "cannot assign approvers for task: " + userTask.getDisplayName()
+ ", as the approvers in table `wf_approver` was empty");
if (!userTask.isNeedAllApprove()) {
@@ -166,7 +166,7 @@ public class UserTaskProcessor extends AbstractTaskProcessor<UserTask> {
UserTask userTask = (UserTask) actionContext.getTask();
if (needForm(userTask, actionContext.getAction())) {
- Preconditions.checkNotNull(actionContext.getForm(), "form can't be null");
+ Preconditions.checkNotNull(actionContext.getForm(), "form cannot be null");
Preconditions.checkTrue(actionContext.getForm().getClass().isAssignableFrom(userTask.getFormClass()),
() -> "form type not match, should be class " + userTask.getFormClass());
actionContext.getForm().validate();
diff --git a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
index 39c6039..30d2412 100644
--- a/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
+++ b/inlong-manager/manager-workflow-engine/src/main/java/org/apache/inlong/manager/workflow/util/WorkflowFormParserUtils.java
@@ -41,8 +41,8 @@ public class WorkflowFormParserUtils {
*/
public static <T extends TaskForm> T parseTaskForm(TaskInstance taskInstance, Process process)
throws FormParseException {
- Preconditions.checkNotNull(taskInstance, "taskInstance can't be null");
- Preconditions.checkNotNull(process, "process can't be null");
+ Preconditions.checkNotNull(taskInstance, "taskInstance cannot be null");
+ Preconditions.checkNotNull(process, "process cannot be null");
if (StringUtils.isEmpty(taskInstance.getFormData())) {
return null;
@@ -67,7 +67,7 @@ public class WorkflowFormParserUtils {
*/
public static <T extends ProcessForm> T parseProcessForm(String formDate, Process process)
throws FormParseException {
- Preconditions.checkNotNull(process, "process can't be null");
+ Preconditions.checkNotNull(process, "process cannot be null");
if (StringUtils.isEmpty(formDate)) {
return null;