You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by go...@apache.org on 2021/12/02 02:19:39 UTC
[incubator-inlong] branch master updated: [INLONG-1817][Feature][InLong-Manager] Workflow supports data stream for Pulsar (#1868)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 093d71b [INLONG-1817][Feature][InLong-Manager] Workflow supports data stream for Pulsar (#1868)
093d71b is described below
commit 093d71b00e925a9e1ebfbff8d814345a2f9f87c6
Author: healchow <he...@gmail.com>
AuthorDate: Thu Dec 2 10:19:35 2021 +0800
[INLONG-1817][Feature][InLong-Manager] Workflow supports data stream for Pulsar (#1868)
Co-authored-by: healchow <he...@gmail.com>
---
inlong-manager/manager-common/pom.xml | 12 +
.../manager/common/pojo/business/BusinessInfo.java | 14 +-
.../inlong/manager/dao/entity/BusinessEntity.java | 2 +
.../dao/mapper/ConsumptionEntityMapper.java | 4 +-
.../src/main/resources/generatorConfig.xml | 2 +-
.../resources/mappers/BusinessEntityMapper.xml | 74 +++--
.../test/resources/sql/apache_inlong_manager.sql | 44 +--
inlong-manager/manager-service/pom.xml | 21 ++
.../core/impl/BusinessProcessOperation.java | 2 +-
.../service/core/impl/ConsumptionServiceImpl.java | 3 +-
.../service/core/impl/StorageBaseOperation.java | 14 +-
....java => CreateHiveTableForStreamListener.java} | 6 +-
...mListener.java => CreateHiveTableListener.java} | 6 +-
.../mq/CreatePulsarGroupForStreamTaskListener.java | 136 ++++++++
.../mq/CreatePulsarGroupTaskListener.java | 128 +++++++
.../mq/CreatePulsarResourceTaskListener.java | 131 ++++++++
.../mq/CreatePulsarTopicForStreamTaskListener.java | 114 +++++++
...tener.java => CreateTubeGroupTaskListener.java} | 6 +-
.../thirdpart/mq/CreateTubeTopicTaskListener.java | 4 +-
.../service/thirdpart/mq/PulsarOptService.java | 47 +++
.../service/thirdpart/mq/PulsarOptServiceImpl.java | 250 ++++++++++++++
.../service/thirdpart/mq/util/PulsarUtils.java | 50 +++
.../thirdpart/sort/PushHiveConfigTaskListener.java | 6 +-
.../service/workflow/BaseWorkflowFormType.java | 10 +-
.../service/workflow/BaseWorkflowTaskFormType.java | 8 +-
.../manager/service/workflow/ProcessName.java | 8 +-
.../BusinessAdminApproveForm.java} | 6 +-
.../BusinessResourceWorkflowForm.java} | 6 +-
.../business/CreateBusinessWorkflowDefinition.java | 208 ++++++++++++
.../NewBusinessWorkflowDefinition.java | 22 +-
.../NewBusinessWorkflowForm.java | 2 +-
.../listener/BusinessCancelProcessListener.java} | 6 +-
.../listener/BusinessCompleteProcessListener.java} | 10 +-
.../listener/BusinessFailedProcessListener.java} | 10 +-
.../listener/BusinessPassTaskListener.java} | 8 +-
.../listener/BusinessRejectProcessListener.java} | 6 +-
.../listener/InitBusinessInfoListener.java | 6 +-
.../StartCreateResourceProcessListener.java | 8 +-
.../ConsumptionAdminApproveForm.java} | 10 +-
.../NewConsumptionProcessDetailHandler.java | 2 +-
.../NewConsumptionWorkflowDefinition.java | 34 +-
.../NewConsumptionWorkflowForm.java | 2 +-
.../listener/ConsumptionCancelProcessListener.java | 14 +-
.../ConsumptionCompleteProcessListener.java | 175 ++++++++++
.../listener/ConsumptionPassTaskListener.java} | 22 +-
.../listener/ConsumptionRejectProcessListener.java | 4 +-
.../CreateResourceWorkflowDefinition.java | 176 ----------
.../ConsumptionCompleteProcessListener.java | 143 --------
.../CreateStreamWorkflowDefinition.java} | 76 ++++-
.../InitBusinessInfoForStreamListener.java | 6 +-
.../StreamCompleteProcessListener.java} | 10 +-
.../StreamFailedProcessListener.java} | 10 +-
.../src/main/resources/application-dev.properties | 2 +-
.../src/main/resources/application-prod.properties | 4 +-
.../src/main/resources/application-test.properties | 4 +-
inlong-manager/sql/apache_inlong_manager.sql | 368 +++++++++++----------
56 files changed, 1744 insertions(+), 718 deletions(-)
diff --git a/inlong-manager/manager-common/pom.xml b/inlong-manager/manager-common/pom.xml
index 3692efe..6bb2b77 100644
--- a/inlong-manager/manager-common/pom.xml
+++ b/inlong-manager/manager-common/pom.xml
@@ -158,10 +158,22 @@
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-admin</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
index b3d8643..6011054 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
@@ -55,6 +55,13 @@ public class BusinessInfo {
@ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
private String middlewareType;
+ @ApiModelProperty(value = "Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order "
+ + "messages; serial: single partition, low throughput, and orderly messages")
+ private String queueModule = "parallel";
+
+ @ApiModelProperty(value = "The number of partitions of Pulsar Topic, 1-20")
+ private Integer topicPartitionNum = 3;
+
@ApiModelProperty(value = "MQ resource object, in business",
notes = "Tube corresponds to Topic, Pulsar corresponds to Namespace")
private String mqResourceObj;
@@ -68,13 +75,6 @@ public class BusinessInfo {
@ApiModelProperty(value = "Pulsar service URL")
private String pulsarServiceUrl;
- @ApiModelProperty(value = "Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order "
- + "messages; serial: single partition, low throughput, and orderly messages")
- private String queueModule = "parallel";
-
- @ApiModelProperty(value = "The number of partitions of Pulsar Topic, 1-20")
- private Integer topicPartitionNum = 3;
-
@ApiModelProperty(value = "Data type name")
private String schemaName;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java
index fcf17a0..d547550 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/BusinessEntity.java
@@ -31,6 +31,8 @@ public class BusinessEntity implements Serializable {
private String cnName;
private String description;
private String middlewareType;
+ private String queueModule;
+ private Integer topicPartitionNum;
private String mqResourceObj;
private Integer dailyRecords;
private Integer dailyStorage;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
index 95990fa..82ac6fa 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.dao.mapper;
import java.util.List;
+import org.apache.ibatis.annotations.Param;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.workflow.model.view.CountByKey;
@@ -34,7 +35,8 @@ public interface ConsumptionEntityMapper {
ConsumptionEntity selectByPrimaryKey(Integer id);
- ConsumptionEntity selectConsumptionExists(String groupId, String topic, String consumerGroup);
+ ConsumptionEntity selectConsumptionExists(@Param("groupId") String groupId, @Param("topic") String topic,
+ @Param("consumerGroup") String consumerGroup);
int updateByPrimaryKeySelective(ConsumptionEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
index 55b618b..b46e258 100644
--- a/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
+++ b/inlong-manager/manager-dao/src/main/resources/generatorConfig.xml
@@ -38,7 +38,7 @@
<!-- Database connection URL, username, password -->
<jdbcConnection driverClass="com.mysql.cj.jdbc.Driver"
connectionURL="jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?nullCatalogMeansCurrent=true"
- userId="xxxxxx" password="xxxxxx">
+ userId="root" password="inlong">
</jdbcConnection>
<javaTypeResolver>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
index 7af02da..3cd1541 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/BusinessEntityMapper.xml
@@ -27,6 +27,8 @@
<result column="cn_name" jdbcType="VARCHAR" property="cnName"/>
<result column="description" jdbcType="VARCHAR" property="description"/>
<result column="middleware_type" jdbcType="VARCHAR" property="middlewareType"/>
+ <result column="queue_module" jdbcType="VARCHAR" property="queueModule"/>
+ <result column="topic_partition_num" jdbcType="INTEGER" property="topicPartitionNum"/>
<result column="mq_resource_obj" jdbcType="VARCHAR" property="mqResourceObj"/>
<result column="daily_records" jdbcType="INTEGER" property="dailyRecords"/>
<result column="daily_storage" jdbcType="INTEGER" property="dailyStorage"/>
@@ -50,8 +52,8 @@
</resultMap>
<sql id="Base_Column_List">
- id, inlong_group_id, name, cn_name, description, middleware_type, mq_resource_obj,
- daily_records, daily_storage, peak_records, max_length, schema_name, in_charges, followers,
+ id, inlong_group_id, name, cn_name, description, middleware_type, queue_module, topic_partition_num,
+ mq_resource_obj, daily_records, daily_storage, peak_records, max_length, schema_name, in_charges, followers,
status, is_deleted, creator, modifier, create_time, modify_time, temp_view
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
@@ -115,6 +117,7 @@
parameterType="org.apache.inlong.manager.dao.entity.BusinessEntity">
insert into business (id, inlong_group_id, name,
cn_name, description, middleware_type,
+ queue_module, topic_partition_num,
mq_resource_obj, daily_records, daily_storage,
peak_records, max_length, schema_name,
in_charges, followers, status,
@@ -122,6 +125,7 @@
create_time, modify_time, temp_view)
values (#{id,jdbcType=INTEGER}, #{inlongGroupId,jdbcType=VARCHAR}, #{name,jdbcType=VARCHAR},
#{cnName,jdbcType=VARCHAR}, #{description,jdbcType=VARCHAR}, #{middlewareType,jdbcType=VARCHAR},
+ #{queueModule,jdbcType=VARCHAR}, #{topicPartitionNum,jdbcType=INTEGER},
#{mqResourceObj,jdbcType=VARCHAR}, #{dailyRecords,jdbcType=INTEGER}, #{dailyStorage,jdbcType=INTEGER},
#{peakRecords,jdbcType=INTEGER}, #{maxLength,jdbcType=INTEGER}, #{schemaName,jdbcType=VARCHAR},
#{inCharges,jdbcType=VARCHAR}, #{followers,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},
@@ -150,6 +154,12 @@
<if test="middlewareType != null">
middleware_type,
</if>
+ <if test="queueModule != null">
+ queue_module,
+ </if>
+ <if test="topicPartitionNum != null">
+ topic_partition_num,
+ </if>
<if test="mqResourceObj != null">
mq_resource_obj,
</if>
@@ -215,6 +225,12 @@
<if test="middlewareType != null">
#{middlewareType,jdbcType=VARCHAR},
</if>
+ <if test="queueModule != null">
+ #{queueModule,jdbcType=VARCHAR},
+ </if>
+ <if test="topicPartitionNum != null">
+ #{topicPartitionNum,jdbcType=INTEGER},
+ </if>
<if test="mqResourceObj != null">
#{mqResourceObj,jdbcType=VARCHAR},
</if>
@@ -280,6 +296,12 @@
<if test="middlewareType != null">
middleware_type = #{middlewareType,jdbcType=VARCHAR},
</if>
+ <if test="queueModule != null">
+ queue_module = #{queueModule,jdbcType=VARCHAR},
+ </if>
+ <if test="topicPartitionNum != null">
+ topic_partition_num = #{topicPartitionNum,jdbcType=INTEGER},
+ </if>
<if test="mqResourceObj != null">
mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
</if>
@@ -343,6 +365,12 @@
<if test="middlewareType != null">
middleware_type = #{middlewareType,jdbcType=VARCHAR},
</if>
+ <if test="queueModule != null">
+ queue_module = #{queueModule,jdbcType=VARCHAR},
+ </if>
+ <if test="topicPartitionNum != null">
+ topic_partition_num = #{topicPartitionNum,jdbcType=INTEGER},
+ </if>
<if test="mqResourceObj != null">
mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
</if>
@@ -389,26 +417,28 @@
<update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.BusinessEntity">
update business
- set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
- name = #{name,jdbcType=VARCHAR},
- cn_name = #{cnName,jdbcType=VARCHAR},
- description = #{description,jdbcType=VARCHAR},
- middleware_type = #{middlewareType,jdbcType=VARCHAR},
- mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
- daily_records = #{dailyRecords,jdbcType=INTEGER},
- daily_storage = #{dailyStorage,jdbcType=INTEGER},
- peak_records = #{peakRecords,jdbcType=INTEGER},
- max_length = #{maxLength,jdbcType=INTEGER},
- schema_name = #{schemaName,jdbcType=VARCHAR},
- in_charges = #{inCharges,jdbcType=VARCHAR},
- followers = #{followers,jdbcType=VARCHAR},
- status = #{status,jdbcType=INTEGER},
- is_deleted = #{isDeleted,jdbcType=INTEGER},
- creator = #{creator,jdbcType=VARCHAR},
- modifier = #{modifier,jdbcType=VARCHAR},
- create_time = #{createTime,jdbcType=TIMESTAMP},
- modify_time = #{modifyTime,jdbcType=TIMESTAMP},
- temp_view = #{tempView,jdbcType=LONGVARCHAR}
+ set inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ name = #{name,jdbcType=VARCHAR},
+ cn_name = #{cnName,jdbcType=VARCHAR},
+ description = #{description,jdbcType=VARCHAR},
+ middleware_type = #{middlewareType,jdbcType=VARCHAR},
+ queue_module = #{queueModule,jdbcType=VARCHAR},
+ topic_partition_num = #{topicPartitionNum,jdbcType=INTEGER},
+ mq_resource_obj = #{mqResourceObj,jdbcType=VARCHAR},
+ daily_records = #{dailyRecords,jdbcType=INTEGER},
+ daily_storage = #{dailyStorage,jdbcType=INTEGER},
+ peak_records = #{peakRecords,jdbcType=INTEGER},
+ max_length = #{maxLength,jdbcType=INTEGER},
+ schema_name = #{schemaName,jdbcType=VARCHAR},
+ in_charges = #{inCharges,jdbcType=VARCHAR},
+ followers = #{followers,jdbcType=VARCHAR},
+ status = #{status,jdbcType=INTEGER},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ creator = #{creator,jdbcType=VARCHAR},
+ modifier = #{modifier,jdbcType=VARCHAR},
+ create_time = #{createTime,jdbcType=TIMESTAMP},
+ modify_time = #{modifyTime,jdbcType=TIMESTAMP},
+ temp_view = #{tempView,jdbcType=LONGVARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
<update id="updateStatusByIdentifier">
diff --git a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
index 08b2320..99dbe3e 100644
--- a/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-dao/src/test/resources/sql/apache_inlong_manager.sql
@@ -71,27 +71,29 @@ CREATE TABLE `agent_sys_conf`
DROP TABLE IF EXISTS `business`;
CREATE TABLE `business`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
- `name` varchar(128) DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
- `cn_name` varchar(256) DEFAULT NULL COMMENT 'Chinese display name',
- `description` varchar(256) DEFAULT '' COMMENT 'Business Introduction',
- `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
- `mq_resource_obj` varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
- `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
- `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
- `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
- `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
- `schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
- `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
- `status` int(4) DEFAULT '21' COMMENT 'Business status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` text DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
+ `name` varchar(128) DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
+ `cn_name` varchar(256) DEFAULT NULL COMMENT 'Chinese display name',
+ `description` varchar(256) DEFAULT '' COMMENT 'Business Introduction',
+ `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+ `queue_module` VARCHAR(20) NULL DEFAULT 'parallel' COMMENT 'Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order messages; serial: single partition, low throughput, and orderly messages',
+ `topic_partition_num` INT(4) NULL DEFAULT '3' COMMENT 'The number of partitions of Pulsar Topic, 1-20',
+ `mq_resource_obj` varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+ `schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
+ `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+ `followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
+ `status` int(4) DEFAULT '21' COMMENT 'Business status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_business` (`inlong_group_id`, `is_deleted`, `modify_time`)
);
diff --git a/inlong-manager/manager-service/pom.xml b/inlong-manager/manager-service/pom.xml
index e70b144..041795c 100644
--- a/inlong-manager/manager-service/pom.xml
+++ b/inlong-manager/manager-service/pom.xml
@@ -152,6 +152,27 @@
</exclusion>
</exclusions>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-admin</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.validation</groupId>
+ <artifactId>validation-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
index 3d551ca..081aa5f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessProcessOperation.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.service.core.StorageService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowResult;
import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
import org.apache.inlong.manager.common.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index a37a771..316b356 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -59,7 +59,7 @@ import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowResult;
import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
import org.apache.inlong.manager.workflow.model.view.CountByKey;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -344,6 +344,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
entity.setMiddlewareType(middlewareType);
entity.setTopic(topic);
entity.setConsumerGroupId(consumerGroup);
+ entity.setConsumerGroupName(consumerGroup);
entity.setInCharges(bizInfo.getInCharges());
entity.setFilterEnabled(0);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java
index 80b8759..56b7c6b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/StorageBaseOperation.java
@@ -38,9 +38,9 @@ import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageExtEntityMapper;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newstream.SingleStreamWorkflowDefinition;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.stream.CreateStreamWorkflowDefinition;
import org.apache.inlong.manager.common.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -150,7 +150,7 @@ public class StorageBaseOperation {
/**
* Asynchronously initiate a single data stream related workflow
*
- * @see SingleStreamWorkflowDefinition
+ * @see CreateStreamWorkflowDefinition
*/
class WorkflowStartRunnable implements Runnable {
@@ -170,7 +170,7 @@ public class StorageBaseOperation {
LOGGER.info("begin start data stream workflow, groupId={}, streamId={}", groupId, streamId);
BusinessInfo businessInfo = CommonBeanUtils.copyProperties(businessEntity, BusinessInfo::new);
- CreateResourceWorkflowForm form = genBizResourceWorkflowForm(businessInfo, streamId);
+ BusinessResourceWorkflowForm form = genBizResourceWorkflowForm(businessInfo, streamId);
workflowService.start(ProcessName.CREATE_DATASTREAM_RESOURCE, operator, form);
LOGGER.info("success start data stream workflow, groupId={}, streamId={}", groupId, streamId);
@@ -179,8 +179,8 @@ public class StorageBaseOperation {
/**
* Generate [Create Business Resource] form
*/
- private CreateResourceWorkflowForm genBizResourceWorkflowForm(BusinessInfo businessInfo, String streamId) {
- CreateResourceWorkflowForm form = new CreateResourceWorkflowForm();
+ private BusinessResourceWorkflowForm genBizResourceWorkflowForm(BusinessInfo businessInfo, String streamId) {
+ BusinessResourceWorkflowForm form = new BusinessResourceWorkflowForm();
form.setBusinessInfo(businessInfo);
form.setInlongStreamId(streamId);
return form;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
index 58ee3ac..9106bca 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForOneStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForStreamListener.java
@@ -21,7 +21,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
*/
@Service
@Slf4j
-public class CreateHiveTableForOneStreamListener implements TaskEventListener {
+public class CreateHiveTableForStreamListener implements TaskEventListener {
@Autowired
private StorageHiveEntityMapper hiveEntityMapper;
@@ -48,7 +48,7 @@ public class CreateHiveTableForOneStreamListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
String streamId = form.getInlongStreamId();
log.info("begin create hive table for groupId={}, streamId={}", groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
index 8b9d6eb..665d1ee 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableForAllStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/hive/CreateHiveTableListener.java
@@ -21,7 +21,7 @@ import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.datastorage.StorageHiveSortInfo;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Service;
*/
@Service
@Slf4j
-public class CreateHiveTableForAllStreamListener implements TaskEventListener {
+public class CreateHiveTableListener implements TaskEventListener {
@Autowired
private StorageHiveEntityMapper hiveEntityMapper;
@@ -48,7 +48,7 @@ public class CreateHiveTableForAllStreamListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
log.info("begin to create hive table for groupId={}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
new file mode 100644
index 0000000..901f1d1
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.core.StorageService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create a subscription group for a single data stream
+ */
+@Slf4j
+@Component
+public class CreatePulsarGroupForStreamTaskListener implements TaskEventListener {
+
+ @Autowired
+ private ClusterBean clusterBean;
+ @Autowired
+ private BusinessService businessService;
+ @Autowired
+ private DataStreamEntityMapper dataStreamMapper;
+ @Autowired
+ private PulsarOptService pulsarOptService;
+ @Autowired
+ private StorageService storageService;
+ @Autowired
+ private ConsumptionService consumptionService;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ String streamId = form.getInlongStreamId();
+
+ BusinessInfo bizInfo = businessService.get(groupId);
+ if (bizInfo == null) {
+ log.error("business not found with groupId={}", groupId);
+ throw new WorkflowListenerException("business not found with groupId=" + groupId);
+ }
+
+ DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, streamId);
+ if (streamEntity == null) {
+ log.warn("data stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId);
+ return ListenerResult.success();
+ }
+
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+ // Query data storage info based on groupId and streamId
+ List<String> storageTypeList = storageService.getStorageTypeList(groupId, streamId);
+ if (storageTypeList == null || storageTypeList.size() == 0) {
+ log.warn("storage info is empty for groupId={}, streamId={}, skip to create pulsar group",
+ groupId, streamId);
+ return ListenerResult.success();
+ }
+
+ PulsarTopicBean topicBean = new PulsarTopicBean();
+ topicBean.setTenant(clusterBean.getDefaultTenant());
+ topicBean.setNamespace(bizInfo.getMqResourceObj());
+ String topic = streamEntity.getMqResourceObj();
+ topicBean.setTopicName(topic);
+ List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+
+ // Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
+ String tenant = clusterBean.getDefaultTenant();
+ String namespace = bizInfo.getMqResourceObj();
+ for (String cluster : pulsarClusters) {
+ String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceUrl)) {
+ boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+ if (!exist) {
+ String fullTopic = tenant + "/" + namespace + "/" + topic;
+ log.error("topic={} not exists in {}", fullTopic, pulsarAdmin.getServiceUrl());
+ throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
+ }
+
+ // Consumer naming rules: sortAppName_topicName_consumer_group
+ String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+ pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+
+ // Insert the consumption data into the consumption table
+ consumptionService.saveSortConsumption(bizInfo, topic, subscription);
+ }
+ }
+ } catch (Exception e) {
+ log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
+ throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
+ }
+
+ log.info("finish to create single pulsar subscription for groupId={}, streamId={}", groupId, streamId);
+ return ListenerResult.success();
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
new file mode 100644
index 0000000..2fa3fe0
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarGroupTaskListener.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create default subscription group for Pulsar
+ */
+@Component
+@Slf4j
+public class CreatePulsarGroupTaskListener implements TaskEventListener {
+
+ @Autowired
+ private ClusterBean clusterBean;
+ @Autowired
+ private BusinessService businessService;
+ @Autowired
+ private ConsumptionService consumptionService;
+ @Autowired
+ private DataStreamEntityMapper dataStreamMapper;
+ @Autowired
+ private PulsarOptService pulsarOptService;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+
+ String groupId = form.getInlongGroupId();
+ BusinessInfo bizInfo = businessService.get(groupId);
+ if (bizInfo == null) {
+ log.error("business not found with groupId={}", groupId);
+ throw new WorkflowListenerException("business not found with groupId=" + groupId);
+ }
+
+ // For Pulsar, each Stream corresponds to a Topic
+ List<DataStreamEntity> streamEntities = dataStreamMapper.selectByGroupId(groupId);
+ if (streamEntities == null || streamEntities.isEmpty()) {
+ log.warn("data stream is empty for groupId={}, skip to create pulsar subscription", groupId);
+ return ListenerResult.success();
+ }
+
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+ String tenant = clusterBean.getDefaultTenant();
+ String namespace = bizInfo.getMqResourceObj();
+
+ for (DataStreamEntity streamEntity : streamEntities) {
+ PulsarTopicBean topicBean = new PulsarTopicBean();
+ topicBean.setTenant(tenant);
+ topicBean.setNamespace(namespace);
+ String topic = streamEntity.getMqResourceObj();
+ topicBean.setTopicName(topic);
+ List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+
+ // Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
+ for (String cluster : pulsarClusters) {
+ String url = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(url)) {
+ boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+
+ if (!exist) {
+ String topicFull = tenant + "/" + namespace + "/" + topic;
+ log.error("topic={} not exists in {}", topicFull, url);
+ throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + url);
+ }
+
+ // Consumer naming rules: sortAppName_topicName_consumer_group
+ String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+ pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+
+ // Insert the consumption data into the consumption table
+ consumptionService.saveSortConsumption(bizInfo, topic, subscription);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("create pulsar subscription error for groupId={}", groupId);
+ throw new WorkflowListenerException("create pulsar subscription error: " + e.getMessage());
+ }
+
+ log.info("success to create pulsar subscription for groupId={}", groupId);
+ return ListenerResult.success();
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
new file mode 100644
index 0000000..2ee9dfa
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarResourceTaskListener.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessPulsarEntityMapper;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create Pulsar tenant, namespace and topic
+ */
+@Slf4j
+@Component()
+public class CreatePulsarResourceTaskListener implements TaskEventListener {
+
+ @Autowired
+ PulsarOptService pulsarOptService;
+ @Autowired
+ private ClusterBean clusterBean;
+ @Autowired
+ private BusinessService businessService;
+ @Autowired
+ private BusinessPulsarEntityMapper businessPulsarMapper;
+ @Autowired
+ private DataStreamEntityMapper dataStreamMapper;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ log.info("begin to create pulsar resource for groupId={}", groupId);
+
+ BusinessInfo businessInfo = businessService.get(groupId);
+ if (businessInfo == null) {
+ throw new WorkflowListenerException("business or pulsar cluster not found for groupId=" + groupId);
+ }
+
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+ List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+ for (String cluster : pulsarClusters) {
+ String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+ this.createPulsarProcess(businessInfo, serviceUrl);
+ }
+ } catch (Exception e) {
+ log.error("create pulsar resource error for groupId={}", groupId, e);
+ throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId);
+ }
+
+ log.info("success to create pulsar resource for groupId={}", groupId);
+ return ListenerResult.success();
+ }
+
+ /**
+ * Create Pulsar tenant, namespace and topic
+ */
+ private void createPulsarProcess(BusinessInfo businessInfo, String serviceHttpUrl) throws Exception {
+ String groupId = businessInfo.getInlongGroupId();
+ log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, serviceHttpUrl);
+
+ String namespace = businessInfo.getMqResourceObj();
+ Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty for groupId=" + groupId);
+ String queueModule = businessInfo.getQueueModule();
+ Preconditions.checkNotNull(queueModule, "queue module cannot be empty for groupId=" + groupId);
+
+ String tenant = clusterBean.getDefaultTenant();
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceHttpUrl)) {
+ // create pulsar tenant
+ pulsarOptService.createTenant(pulsarAdmin, tenant);
+
+ // create pulsar namespace
+ BusinessPulsarEntity entity = businessPulsarMapper.selectByGroupId(groupId);
+ pulsarOptService.createNamespace(pulsarAdmin, entity, tenant, namespace);
+
+ // create pulsar topic
+ Integer partitionNum = businessInfo.getTopicPartitionNum();
+ List<DataStreamTopicVO> streamTopicList = dataStreamMapper.selectTopicList(groupId);
+ PulsarTopicBean topicBean = PulsarTopicBean.builder()
+ .tenant(tenant).namespace(namespace).numPartitions(partitionNum).queueModule(queueModule).build();
+
+ for (DataStreamTopicVO topicVO : streamTopicList) {
+ topicBean.setTopicName(topicVO.getMqResourceObj());
+ pulsarOptService.createTopic(pulsarAdmin, topicBean);
+ }
+ }
+ log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId, serviceHttpUrl);
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
new file mode 100644
index 0000000..1211c86
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create task listener for Pulsar Topic
+ */
+@Slf4j
+@Component
+public class CreatePulsarTopicForStreamTaskListener implements TaskEventListener {
+
+ @Autowired
+ private ClusterBean clusterBean;
+ @Autowired
+ private PulsarOptService pulsarOptService;
+ @Autowired
+ private BusinessService businessService;
+ @Autowired
+ private DataStreamEntityMapper dataStreamMapper;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ String streamId = form.getInlongStreamId();
+
+ BusinessInfo businessInfo = businessService.get(groupId);
+ DataStreamEntity streamEntity = dataStreamMapper.selectByIdentifier(groupId, streamId);
+ if (businessInfo == null || streamEntity == null) {
+ throw new WorkflowListenerException("business or data stream not found with groupId=" + groupId
+ + ", streamId=" + streamId);
+ }
+
+ log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
+ try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+ List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
+ for (String cluster : pulsarClusters) {
+ String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+ String pulsarTopic = streamEntity.getMqResourceObj();
+ this.createTopicOpt(businessInfo, pulsarTopic, serviceUrl);
+ }
+ } catch (Exception e) {
+ log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
+ throw new WorkflowListenerException(
+ "create pulsar topic error for groupId=" + groupId + ", streamId=" + streamId);
+ }
+
+ log.info("success to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
+ return ListenerResult.success();
+ }
+
+ private void createTopicOpt(BusinessInfo bizInfo, String pulsarTopic, String serviceHttpUrl) throws Exception {
+ Integer partitionNum = bizInfo.getTopicPartitionNum();
+ int partition = 0;
+ if (partitionNum != null && partitionNum > 0) {
+ partition = partitionNum;
+ }
+
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceHttpUrl)) {
+ PulsarTopicBean topicBean = PulsarTopicBean.builder()
+ .tenant(clusterBean.getDefaultTenant())
+ .namespace(bizInfo.getMqResourceObj())
+ .topicName(pulsarTopic)
+ .numPartitions(partition)
+ .queueModule(bizInfo.getQueueModule())
+ .build();
+ pulsarOptService.createTopic(pulsarAdmin, topicBean);
+ }
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumerGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
similarity index 94%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumerGroupTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
index 580c249..3a83099 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumerGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeGroupTaskListener.java
@@ -26,7 +26,7 @@ import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.G
import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -38,7 +38,7 @@ import org.springframework.stereotype.Component;
@Component
@Slf4j
-public class CreateTubeConsumerGroupTaskListener implements TaskEventListener {
+public class CreateTubeGroupTaskListener implements TaskEventListener {
@Autowired
BusinessService businessService;
@@ -62,7 +62,7 @@ public class CreateTubeConsumerGroupTaskListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
log.info("try to create consumer group for groupId {}", groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
index 01d73c6..2bd275c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeTopicTaskListener.java
@@ -22,7 +22,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -50,7 +50,7 @@ public class CreateTubeTopicTaskListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
log.info("begin create tube topic for groupId={}", form.getInlongGroupId());
String groupId = form.getInlongGroupId();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptService.java
new file mode 100644
index 0000000..899cf99
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.List;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+
+/**
+ * Interface of Pulsar operation
+ */
+public interface PulsarOptService {
+
+ void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException;
+
+ void createNamespace(PulsarAdmin pulsarAdmin, BusinessPulsarEntity pulsarEntity, String tenant,
+ String namespace) throws PulsarAdminException;
+
+ void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException;
+
+ void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
+ throws PulsarAdminException;
+
+ void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+ List<String> topics) throws PulsarAdminException;
+
+ boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
+ throws PulsarAdminException;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptServiceImpl.java
new file mode 100644
index 0000000..488a503
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/PulsarOptServiceImpl.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import com.google.common.collect.Sets;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.conversion.ConversionHandle;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessPulsarEntity;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.pulsar.client.admin.Namespaces;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Operation interface of Pulsar
+ */
+@Service
+@Slf4j
+public class PulsarOptServiceImpl implements PulsarOptService {
+
+ @Autowired
+ private ConversionHandle conversionHandle;
+
+ @Override
+ public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
+ log.info("begin to create tenant={}", tenant);
+
+ Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
+ try {
+ List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+ boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
+ if (exists) {
+ log.warn("pulsar tenant={} already exists, skip to create", tenant);
+ return;
+ }
+ TenantInfoImpl tenantInfo = new TenantInfoImpl();
+ tenantInfo.setAllowedClusters(Sets.newHashSet(clusters));
+ tenantInfo.setAdminRoles(Sets.newHashSet());
+ pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
+ log.info("success to create pulsar tenant={}", tenant);
+ } catch (PulsarAdminException e) {
+ log.error("create pulsar tenant={} failed", tenant, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void createNamespace(PulsarAdmin pulsarAdmin, BusinessPulsarEntity pulsarEntity,
+ String tenant, String namespace) throws PulsarAdminException {
+ Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
+ Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
+
+ String namespaceName = tenant + "/" + namespace;
+ log.info("begin to create namespace={}", namespaceName);
+
+ try {
+ // Check whether the namespace exists, and create it if it does not exist
+ boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
+ if (isExists) {
+ log.warn("namespace={} already exists, skip to create", namespaceName);
+ return;
+ }
+
+ List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+ Namespaces namespaces = pulsarAdmin.namespaces();
+ namespaces.createNamespace(namespaceName, Sets.newHashSet(clusters));
+
+ // Configure message TTL
+ Integer ttl = pulsarEntity.getTtl();
+ if (ttl > 0) {
+ namespaces.setNamespaceMessageTTL(namespaceName, conversionHandle.handleConversion(ttl,
+ pulsarEntity.getTtlUnit().toLowerCase() + "_seconds"));
+ }
+
+ // retentionTimeInMinutes retentionSizeInMB
+ Integer retentionTime = pulsarEntity.getRetentionTime();
+ if (retentionTime > 0) {
+ retentionTime = conversionHandle.handleConversion(retentionTime,
+ pulsarEntity.getRetentionTimeUnit().toLowerCase() + "_minutes");
+ }
+ Integer retentionSize = pulsarEntity.getRetentionSize();
+ if (retentionSize > 0) {
+ retentionSize = conversionHandle.handleConversion(retentionSize,
+ pulsarEntity.getRetentionSizeUnit().toLowerCase() + "_mb");
+ }
+
+ // Configure retention policies
+ RetentionPolicies retentionPolicies = new RetentionPolicies(retentionTime, retentionSize);
+ namespaces.setRetention(namespaceName, retentionPolicies);
+
+ // Configure persistence policies
+ PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarEntity.getEnsemble(),
+ pulsarEntity.getWriteQuorum(), pulsarEntity.getAckQuorum(), 0);
+ namespaces.setPersistence(namespaceName, persistencePolicies);
+
+ log.info("success to create namespace={}", tenant);
+ } catch (PulsarAdminException e) {
+ log.error("create namespace={} error", tenant, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
+ Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
+
+ String tenant = topicBean.getTenant();
+ String namespace = topicBean.getNamespace();
+ String topic = topicBean.getTopicName();
+ String topicFullName = tenant + "/" + namespace + "/" + topic;
+
+ // Topic will be returned if it exists, and created if it does not exist
+ if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+ log.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
+ return;
+ }
+
+ try {
+ String queueModule = topicBean.getQueueModule();
+ // create non-partition topic
+ if (BizConstant.PULSAR_TOPIC_TYPE_SERIAL.equalsIgnoreCase(queueModule)) {
+ pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
+ } else { // create partition topic
+ List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+ // The number of brokers as the default value of topic partition
+ List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
+ Integer numPartitions = brokers.size();
+ if (topicBean.getNumPartitions() > 0) {
+ numPartitions = topicBean.getNumPartitions();
+ }
+ pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
+ }
+
+ log.info("success to create topic={}", topicFullName);
+ } catch (Exception e) {
+ log.error("create topic={} failed", topicFullName, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
+ throws PulsarAdminException {
+ Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
+ Preconditions.checkNotNull(subscription, "subscription cannot be empty during creating subscription");
+
+ String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
+ log.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
+
+ try {
+ boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
+ if (!isExists) {
+ pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
+ log.info("success to create subscription={}", subscription);
+ } else {
+ log.warn("pulsar subscription={} already exists, skip to create", subscription);
+ }
+ } catch (Exception e) {
+ log.error("create pulsar subscription={} failed", subscription, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+ List<String> topicList) throws PulsarAdminException {
+ for (String topic : topicList) {
+ topicBean.setTopicName(topic);
+ this.createSubscription(pulsarAdmin, topicBean, subscription);
+ }
+ log.info("success to create subscription={} for multiple topics={}", subscription, topicList);
+ }
+
+ /**
+ * Check if Pulsar tenant exists
+ */
+ private boolean tenantIsExists(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
+ List<String> tenantList = pulsarAdmin.tenants().getTenants();
+ return tenantList.contains(tenant);
+ }
+
+ /**
+ * Check whether the specified Namespace exists under the specified Tenant
+ */
+ private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
+ throws PulsarAdminException {
+ List<String> namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
+ return namespaceList.contains(tenant + "/" + namespace);
+ }
+
+ /**
+ * Verify whether the specified Topic exists under the specified Tenant/Namespace
+ *
+ * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
+ * Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
+ */
+ @Override
+ public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
+ throws PulsarAdminException {
+ if (StringUtils.isBlank(topic)) {
+ return true;
+ }
+
+ // persistent://tenant/namespace/topic
+ List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+ for (String t : topicList) {
+ t = t.substring(t.lastIndexOf("/") + 1); // Cannot contain /
+ if (topic.equals(t)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
+ try {
+ List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
+ return subscriptionList.contains(subscription);
+ } catch (PulsarAdminException e) {
+ log.error("check if the topic={} is exists error,", topic, e);
+ return false;
+ }
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/util/PulsarUtils.java
new file mode 100644
index 0000000..bf54a4d
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/util/PulsarUtils.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq.util;
+
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * Pulsar connection utils
+ */
+@Slf4j
+public class PulsarUtils {
+
+ private PulsarUtils() {
+ }
+
+ /**
+ * Obtain the PulsarAdmin client according to the service URL, and it must be closed after use
+ */
+ public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws PulsarClientException {
+ return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
+ }
+
+ public static List<String> getPulsarClusters(PulsarAdmin pulsarAdmin) throws PulsarAdminException {
+ return pulsarAdmin.clusters().getClusters();
+ }
+
+ public static String getServiceUrl(PulsarAdmin pulsarAdmin, String pulsarCluster) throws PulsarAdminException {
+ return pulsarAdmin.clusters().getCluster(pulsarCluster).getServiceUrl();
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
index 9f037ec..dd49261 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/sort/PushHiveConfigTaskListener.java
@@ -37,7 +37,7 @@ import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageHiveEntityMapper;
import org.apache.inlong.manager.dao.mapper.StorageHiveFieldEntityMapper;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -100,7 +100,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
log.debug("begin push hive config to sort, context={}", context);
}
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
BusinessInfo businessInfo = form.getBusinessInfo();
String groupId = businessInfo.getInlongGroupId();
@@ -262,7 +262,7 @@ public class PushHiveConfigTaskListener implements TaskEventListener {
String pulsarTopic = info.getMqResourceObj();
// Full name of Topic in Pulsar
String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
- String adminUrl = clusterBean.getPulsarServiceUrl();
+ String adminUrl = clusterBean.getPulsarAdminUrl();
String serviceUrl = clusterBean.getPulsarServiceUrl();
String consumerGroup = clusterBean.getAppName() + "_" + pulsarTopic + "_consumer_group";
sourceInfo = new PulsarSourceInfo(adminUrl, serviceUrl, fullTopicName, consumerGroup,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java
index f9c8f74..d5649b5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowFormType.java
@@ -21,9 +21,9 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import lombok.Data;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
import org.apache.inlong.manager.workflow.model.definition.ProcessForm;
/**
@@ -35,8 +35,8 @@ import org.apache.inlong.manager.workflow.model.definition.ProcessForm;
@JsonSubTypes({
@JsonSubTypes.Type(value = NewBusinessWorkflowForm.class, name = NewBusinessWorkflowForm.FORM_NAME),
@JsonSubTypes.Type(value = NewConsumptionWorkflowForm.class, name = NewConsumptionWorkflowForm.FORM_NAME),
- @JsonSubTypes.Type(value = CreateResourceWorkflowForm.class,
- name = CreateResourceWorkflowForm.FORM_NAME),
+ @JsonSubTypes.Type(value = BusinessResourceWorkflowForm.class,
+ name = BusinessResourceWorkflowForm.FORM_NAME),
})
public abstract class BaseWorkflowFormType implements ProcessForm {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java
index 5803802..b844057 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/BaseWorkflowTaskFormType.java
@@ -21,8 +21,8 @@ import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import lombok.Data;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessApproveForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionApproveForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessAdminApproveForm;
+import org.apache.inlong.manager.service.workflow.consumption.ConsumptionAdminApproveForm;
import org.apache.inlong.manager.workflow.model.definition.TaskForm;
/**
@@ -31,8 +31,8 @@ import org.apache.inlong.manager.workflow.model.definition.TaskForm;
@Data
@JsonTypeInfo(use = Id.NAME, property = "formName")
@JsonSubTypes({
- @JsonSubTypes.Type(value = NewBusinessApproveForm.class, name = NewBusinessApproveForm.FORM_NAME),
- @JsonSubTypes.Type(value = NewConsumptionApproveForm.class, name = NewConsumptionApproveForm.FORM_NAME),
+ @JsonSubTypes.Type(value = BusinessAdminApproveForm.class, name = BusinessAdminApproveForm.FORM_NAME),
+ @JsonSubTypes.Type(value = ConsumptionAdminApproveForm.class, name = ConsumptionAdminApproveForm.FORM_NAME),
})
public abstract class BaseWorkflowTaskFormType implements TaskForm {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
index 6c45a79..1d695b6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/ProcessName.java
@@ -25,22 +25,22 @@ public enum ProcessName {
/**
* New business access application process
*/
- NEW_BUSINESS_WORKFLOW("New Business Access"),
+ NEW_BUSINESS_WORKFLOW("New-Business-Access"),
/**
* New data consumption application process
*/
- NEW_CONSUMPTION_WORKFLOW("New Data Consumption"),
+ NEW_CONSUMPTION_WORKFLOW("New-Data-Consumption"),
/**
* New business resource creation
*/
- CREATE_BUSINESS_RESOURCE("New Business Access Resource"),
+ CREATE_BUSINESS_RESOURCE("Business-Access-Resource"),
/**
* Single data stream resource creation
*/
- CREATE_DATASTREAM_RESOURCE("Single Data Stream Resource");
+ CREATE_DATASTREAM_RESOURCE("Data-Stream-Resource");
private final String displayName;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessAdminApproveForm.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessAdminApproveForm.java
index f9c68b0..380294e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessApproveForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessAdminApproveForm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
import io.swagger.annotations.ApiModelProperty;
import java.util.List;
@@ -32,9 +32,9 @@ import org.apache.inlong.manager.workflow.exception.FormValidateException;
*/
@Data
@EqualsAndHashCode(callSuper = false)
-public class NewBusinessApproveForm extends BaseWorkflowTaskFormType {
+public class BusinessAdminApproveForm extends BaseWorkflowTaskFormType {
- public static final String FORM_NAME = "NewBusinessApproveForm";
+ public static final String FORM_NAME = "BusinessAdminApproveForm";
@ApiModelProperty(value = "Access business information", required = true)
private BusinessApproveInfo businessApproveInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessResourceWorkflowForm.java
similarity index 91%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessResourceWorkflowForm.java
index 3452293..ac1d844 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/BusinessResourceWorkflowForm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
import com.google.common.collect.Maps;
@@ -33,9 +33,9 @@ import lombok.EqualsAndHashCode;
*/
@Data
@EqualsAndHashCode(callSuper = false)
-public class CreateResourceWorkflowForm extends BaseWorkflowFormType {
+public class BusinessResourceWorkflowForm extends BaseWorkflowFormType {
- public static final String FORM_NAME = "CreateResourceWorkflowForm";
+ public static final String FORM_NAME = "BusinessResourceWorkflowForm";
private BusinessInfo businessInfo;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinition.java
new file mode 100644
index 0000000..0f01b7e
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/CreateBusinessWorkflowDefinition.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow.business;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.dao.entity.DataStreamEntity;
+import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
+import org.apache.inlong.manager.service.core.StorageService;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarResourceTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeGroupTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
+import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
+import org.apache.inlong.manager.service.workflow.ProcessName;
+import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessCompleteProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessFailedProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.InitBusinessInfoListener;
+import org.apache.inlong.manager.workflow.model.definition.EndEvent;
+import org.apache.inlong.manager.workflow.model.definition.Process;
+import org.apache.inlong.manager.workflow.model.definition.ServiceTask;
+import org.apache.inlong.manager.workflow.model.definition.StartEvent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Create workflow definitions for business resources
+ */
+@Slf4j
+@Component
+public class CreateBusinessWorkflowDefinition implements WorkflowDefinition {
+
+ @Autowired
+ private InitBusinessInfoListener initBusinessInfoListener;
+ @Autowired
+ private BusinessCompleteProcessListener businessCompleteProcessListener;
+ @Autowired
+ private BusinessFailedProcessListener businessFailedProcessListener;
+ @Autowired
+ private CreateTubeTopicTaskListener createTubeTopicTaskListener;
+ @Autowired
+ private CreateTubeGroupTaskListener createTubeGroupTaskListener;
+ @Autowired
+ private CreatePulsarResourceTaskListener createPulsarResourceTaskListener;
+ @Autowired
+ private CreatePulsarGroupTaskListener createPulsarGroupTaskListener;
+
+ @Autowired
+ private CreateHiveTableListener createHiveTableListener;
+ @Autowired
+ private PushHiveConfigTaskListener pushHiveConfigTaskListener;
+ @Autowired
+ private StorageService storageService;
+ @Autowired
+ private DataStreamEntityMapper streamMapper;
+
+ @Override
+ public Process defineProcess() {
+
+ // Configuration process
+ Process process = new Process();
+ process.addListener(initBusinessInfoListener);
+ process.addListener(businessCompleteProcessListener);
+ process.addListener(businessFailedProcessListener);
+
+ process.setType("Business Resource Creation");
+ process.setName(getProcessName().name());
+ process.setDisplayName(getProcessName().getDisplayName());
+ process.setFormClass(BusinessResourceWorkflowForm.class);
+ process.setVersion(1);
+ process.setHidden(true);
+
+ // Start node
+ StartEvent startEvent = new StartEvent();
+ process.setStartEvent(startEvent);
+
+ // End node
+ EndEvent endEvent = new EndEvent();
+ process.setEndEvent(endEvent);
+
+ ServiceTask createTubeTopicTask = new ServiceTask();
+ createTubeTopicTask.setSkipResolver(c -> {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+ BusinessInfo businessInfo = form.getBusinessInfo();
+ if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
+ return false;
+ }
+ log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
+ businessInfo.getMiddlewareType(), form.getInlongGroupId());
+ return true;
+ });
+ createTubeTopicTask.setName("createTubeTopic");
+ createTubeTopicTask.setDisplayName("Business-CreateTubeTopic");
+ createTubeTopicTask.addListener(createTubeTopicTaskListener);
+ process.addTask(createTubeTopicTask);
+
+ ServiceTask createTubeConsumerTask = new ServiceTask();
+ createTubeConsumerTask.setSkipResolver(c -> {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+ String middlewareType = form.getBusinessInfo().getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+ return false;
+ }
+ log.warn("no need to create tube resource for groupId={}, as the middleware type is {}",
+ form.getInlongGroupId(), middlewareType);
+ return true;
+ });
+ createTubeConsumerTask.setName("createConsumerGroup");
+ createTubeConsumerTask.setDisplayName("Business-CreateTubeConsumer");
+ createTubeConsumerTask.addListener(createTubeGroupTaskListener);
+ process.addTask(createTubeConsumerTask);
+
+ ServiceTask createPulsarResourceTask = new ServiceTask();
+ createPulsarResourceTask.setSkipResolver(c -> {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+ String middlewareType = form.getBusinessInfo().getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ return false;
+ }
+ log.warn("no need to create pulsar resource for groupId={}, as the middlewareType={}",
+ form.getInlongGroupId(), middlewareType);
+ return true;
+ });
+ createPulsarResourceTask.setName("createPulsarResource");
+ createPulsarResourceTask.setDisplayName("Business-CreatePulsarResource");
+ createPulsarResourceTask.addListener(createPulsarResourceTaskListener);
+ process.addTask(createPulsarResourceTask);
+
+ ServiceTask createPulsarSubscriptionTask = new ServiceTask();
+ createPulsarSubscriptionTask.setSkipResolver(c -> {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+ String middlewareType = form.getBusinessInfo().getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ return false;
+ }
+ log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
+ form.getInlongGroupId(), middlewareType);
+ return true;
+ });
+ createPulsarSubscriptionTask.setName("createPulsarSubscriptionTask");
+ createPulsarSubscriptionTask.setDisplayName("Business-CreatePulsarSubscription");
+ createPulsarSubscriptionTask.addListener(createPulsarGroupTaskListener);
+ process.addTask(createPulsarSubscriptionTask);
+
+ ServiceTask createHiveTablesTask = new ServiceTask();
+ createHiveTablesTask.setSkipResolver(c -> {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+ String groupId = form.getInlongGroupId();
+ List<String> dsForHive = storageService.filterStreamIdByStorageType(groupId, BizConstant.STORAGE_HIVE,
+ streamMapper.selectByGroupId(groupId)
+ .stream()
+ .map(DataStreamEntity::getInlongStreamId)
+ .collect(Collectors.toList()));
+ if (CollectionUtils.isEmpty(dsForHive)) {
+ log.warn("groupId={} streamId={} does not have storage, skip to create hive table ",
+ groupId, form.getInlongStreamId());
+ return true;
+ }
+ return false;
+ });
+ createHiveTablesTask.setName("createHiveTableTask");
+ createHiveTablesTask.setDisplayName("Business-CreateHiveTable");
+ createHiveTablesTask.addListener(createHiveTableListener);
+ process.addTask(createHiveTablesTask);
+
+ ServiceTask pushSortConfig = new ServiceTask();
+ pushSortConfig.setName("pushSortConfig");
+ pushSortConfig.setDisplayName("Business-PushSortConfig");
+ pushSortConfig.addListener(pushHiveConfigTaskListener);
+ process.addTask(pushSortConfig);
+
+ startEvent.addNext(createTubeTopicTask);
+ createTubeTopicTask.addNext(createTubeConsumerTask);
+ createTubeConsumerTask.addNext(createPulsarResourceTask);
+ createPulsarResourceTask.addNext(createPulsarSubscriptionTask);
+ createPulsarSubscriptionTask.addNext(createHiveTablesTask);
+ createHiveTablesTask.addNext(pushSortConfig);
+ pushSortConfig.addNext(endEvent);
+
+ return process;
+ }
+
+ @Override
+ public ProcessName getProcessName() {
+ return ProcessName.CREATE_BUSINESS_RESOURCE;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowDefinition.java
similarity index 81%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowDefinition.java
index 69372f6..7ad4b17 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowDefinition.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
import java.util.List;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveCancelProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApprovePassTaskListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.ApproveRejectProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.StartCreateResourceProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessPassTaskListener;
+import org.apache.inlong.manager.service.workflow.business.listener.BusinessRejectProcessListener;
+import org.apache.inlong.manager.service.workflow.business.listener.StartCreateResourceProcessListener;
import org.apache.inlong.manager.workflow.model.definition.EndEvent;
import org.apache.inlong.manager.workflow.model.definition.Process;
import org.apache.inlong.manager.workflow.model.definition.StartEvent;
@@ -40,11 +40,11 @@ import org.springframework.stereotype.Component;
public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
@Autowired
- private ApprovePassTaskListener approvePassTaskListener;
+ private BusinessPassTaskListener businessPassTaskListener;
@Autowired
- private ApproveCancelProcessListener approveCancelProcessListener;
+ private BusinessCancelProcessListener businessCancelProcessListener;
@Autowired
- private ApproveRejectProcessListener approveRejectProcessListener;
+ private BusinessRejectProcessListener approveRejectProcessListener;
@Autowired
private StartCreateResourceProcessListener startCreateResourceProcessListener;
@Autowired
@@ -61,7 +61,7 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
process.setVersion(1);
// Set up the listener
- process.addListener(approveCancelProcessListener);
+ process.addListener(businessCancelProcessListener);
process.addListener(approveRejectProcessListener);
// Initiate the process of creating business resources,
// and set the business status to [Configuration Successful]/[Configuration Failed] according to its completion
@@ -79,9 +79,9 @@ public class NewBusinessWorkflowDefinition implements WorkflowDefinition {
UserTask adminUserTask = new UserTask();
adminUserTask.setName("ut_admin");
adminUserTask.setDisplayName("System Administrator");
- adminUserTask.setFormClass(NewBusinessApproveForm.class);
+ adminUserTask.setFormClass(BusinessAdminApproveForm.class);
adminUserTask.setApproverAssign(context -> getTaskApprovers(adminUserTask.getName()));
- adminUserTask.addListener(approvePassTaskListener);
+ adminUserTask.addListener(businessPassTaskListener);
process.addTask(adminUserTask);
// Configuration order relationship
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowForm.java
similarity index 97%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowForm.java
index 5663106..fe475e4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/NewBusinessWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/NewBusinessWorkflowForm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness;
+package org.apache.inlong.manager.service.workflow.business;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCancelProcessListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCancelProcessListener.java
index b13fe67..21634fd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCancelProcessListener.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class ApproveCancelProcessListener implements ProcessEventListener {
+public class BusinessCancelProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCompleteProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCompleteProcessListener.java
index 9435468..ad56542 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessCompleteProcessListener.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class CreateResourceCompleteProcessListener implements ProcessEventListener {
+public class BusinessCompleteProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
@@ -50,11 +50,11 @@ public class CreateResourceCompleteProcessListener implements ProcessEventListen
/**
* After the process of creating business resources is completed, modify the status of business and all data stream
* belong to this business to [Configuration Successful] [Configuration Failed]
- * <p/>{@link CreateResourceFailedProcessListener#listen}
+ * <p/>{@link BusinessFailedProcessListener#listen}
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
String username = context.getApplicant();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessFailedProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessFailedProcessListener.java
index 5acdbfd..50e2a1d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/CreateResourceFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessFailedProcessListener.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -35,7 +35,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class CreateResourceFailedProcessListener implements ProcessEventListener {
+public class BusinessFailedProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
@@ -50,11 +50,11 @@ public class CreateResourceFailedProcessListener implements ProcessEventListener
/**
* The process of creating business resources is abnormal, and the business status is changed to
* [Configuration failed] [Configuration successful]
- * <p/>{@link CreateResourceCompleteProcessListener#listen}
+ * <p/>{@link BusinessCompleteProcessListener#listen}
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
String username = context.getApplicant();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessPassTaskListener.java
similarity index 89%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessPassTaskListener.java
index 4bef36a..a23c574 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApprovePassTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessPassTaskListener.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
@@ -23,7 +23,7 @@ import org.apache.inlong.manager.common.pojo.business.BusinessApproveInfo;
import org.apache.inlong.manager.common.pojo.datastream.DataStreamApproveInfo;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessApproveForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessAdminApproveForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -37,7 +37,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class ApprovePassTaskListener implements TaskEventListener {
+public class BusinessPassTaskListener implements TaskEventListener {
@Autowired
private BusinessService businessService;
@@ -52,7 +52,7 @@ public class ApprovePassTaskListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
// Save the data format selected at the time of approval and the cluster information of the data stream
- NewBusinessApproveForm approveForm = (NewBusinessApproveForm) context.getActionContext().getForm();
+ BusinessAdminApproveForm approveForm = (BusinessAdminApproveForm) context.getActionContext().getForm();
// Save the business information after approval
BusinessApproveInfo approveInfo = approveForm.getBusinessApproveInfo();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessRejectProcessListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessRejectProcessListener.java
index 518efc3..ead0d56 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/ApproveRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/BusinessRejectProcessListener.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -34,7 +34,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class ApproveRejectProcessListener implements ProcessEventListener {
+public class BusinessRejectProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/InitBusinessInfoListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/InitBusinessInfoListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/InitBusinessInfoListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/InitBusinessInfoListener.java
index c43a07b..1827b43 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/InitBusinessInfoListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/InitBusinessInfoListener.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -46,7 +46,7 @@ public class InitBusinessInfoListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
BusinessInfo businessInfo = businessService.get(context.getProcessForm().getInlongGroupId());
if (businessInfo != null) {
form.setBusinessInfo(businessInfo);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/StartCreateResourceProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/StartCreateResourceProcessListener.java
similarity index 88%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/StartCreateResourceProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/StartCreateResourceProcessListener.java
index 3e71e3e..03ffbc8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/listener/StartCreateResourceProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/business/listener/StartCreateResourceProcessListener.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newbusiness.listener;
+package org.apache.inlong.manager.service.workflow.business.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.NewBusinessWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.NewBusinessWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -56,7 +56,7 @@ public class StartCreateResourceProcessListener implements ProcessEventListener
NewBusinessWorkflowForm workflowForm = (NewBusinessWorkflowForm) context.getProcessForm();
String groupId = workflowForm.getInlongGroupId();
- CreateResourceWorkflowForm resourceWorkflowForm = new CreateResourceWorkflowForm();
+ BusinessResourceWorkflowForm resourceWorkflowForm = new BusinessResourceWorkflowForm();
resourceWorkflowForm.setBusinessInfo(businessService.get(groupId));
String username = context.getApplicant();
workflowService.start(ProcessName.CREATE_BUSINESS_RESOURCE, username, resourceWorkflowForm);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionApproveForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ConsumptionAdminApproveForm.java
similarity index 84%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionApproveForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ConsumptionAdminApproveForm.java
index 71be5df..ba7824c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionApproveForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/ConsumptionAdminApproveForm.java
@@ -15,23 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.workflow.BaseWorkflowTaskFormType;
import org.apache.inlong.manager.workflow.exception.FormValidateException;
-import org.apache.inlong.manager.common.util.Preconditions;
/**
* New consumption approve form for admin
*/
@Data
+@EqualsAndHashCode(callSuper = true)
@ApiModel("New data consumption-system administrator form")
-public class NewConsumptionApproveForm extends BaseWorkflowTaskFormType {
+public class ConsumptionAdminApproveForm extends BaseWorkflowTaskFormType {
- public static final String FORM_NAME = "NewConsumptionApproveForm";
+ public static final String FORM_NAME = "ConsumptionAdminApproveForm";
@ApiModelProperty("Consumer group ID")
private String consumerGroupId;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionProcessDetailHandler.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionProcessDetailHandler.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
index f33d5b9..7c108fc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionProcessDetailHandler.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionProcessDetailHandler.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
import org.apache.inlong.manager.workflow.core.ProcessDefinitionService;
import org.apache.inlong.manager.workflow.model.definition.Process;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
index c5315b6..16d527f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowDefinition.java
@@ -15,23 +15,21 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionApproveTaskListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCancelProcessListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.newconsumption.listener.ConsumptionRejectProcessListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCancelProcessListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionCompleteProcessListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionPassTaskListener;
+import org.apache.inlong.manager.service.workflow.consumption.listener.ConsumptionRejectProcessListener;
import org.apache.inlong.manager.workflow.model.WorkflowContext;
import org.apache.inlong.manager.workflow.model.definition.EndEvent;
import org.apache.inlong.manager.workflow.model.definition.Process;
@@ -53,7 +51,7 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
private ConsumptionCompleteProcessListener consumptionCompleteProcessListener;
@Autowired
- private ConsumptionApproveTaskListener consumptionApproveTaskListener;
+ private ConsumptionPassTaskListener consumptionPassTaskListener;
@Autowired
private ConsumptionRejectProcessListener consumptionRejectProcessListener;
@@ -101,9 +99,9 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
UserTask adminUserTask = new UserTask();
adminUserTask.setName(UT_ADMINT_NAME);
adminUserTask.setDisplayName("System Administrator");
- adminUserTask.setFormClass(NewConsumptionApproveForm.class);
+ adminUserTask.setFormClass(ConsumptionAdminApproveForm.class);
adminUserTask.setApproverAssign(this::adminUserTaskApprover);
- adminUserTask.addListener(consumptionApproveTaskListener);
+ adminUserTask.addListener(consumptionPassTaskListener);
process.addTask(adminUserTask);
// Set order relationship
@@ -120,9 +118,6 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
}
private List<String> adminUserTaskApprover(WorkflowContext context) {
- NewConsumptionWorkflowForm form = (NewConsumptionWorkflowForm) context.getProcessForm();
- ConsumptionInfo consumptionInfo = Optional.ofNullable(form.getConsumptionInfo())
- .orElseGet(ConsumptionInfo::new);
return workflowApproverService.getApprovers(getProcessName().name(), UT_ADMINT_NAME,
new WorkflowApproverFilterContext());
}
@@ -130,12 +125,11 @@ public class NewConsumptionWorkflowDefinition implements WorkflowDefinition {
private List<String> bizOwnerUserTaskApprover(WorkflowContext context) {
NewConsumptionWorkflowForm form = (NewConsumptionWorkflowForm) context.getProcessForm();
BusinessInfo businessInfo = businessService.get(form.getConsumptionInfo().getInlongGroupId());
+ if (businessInfo == null || businessInfo.getInCharges() == null) {
+ return Collections.emptyList();
+ }
- Iterable<String> inChargesIterator = Splitter.on(",").omitEmptyStrings().trimResults()
- .split(businessInfo.getInCharges());
- List<String> inCharges = Lists.newArrayList();
- inChargesIterator.forEach(inCharges::add);
- return inCharges;
+ return Arrays.asList(businessInfo.getInCharges().split(","));
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowForm.java
similarity index 96%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowForm.java
index c9f7d18..d20db44 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/NewConsumptionWorkflowForm.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newconsumption;
+package org.apache.inlong.manager.service.workflow.consumption;
import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
similarity index 91%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
index e6cd836..ee483da 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCancelProcessListener.java
@@ -15,34 +15,30 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
+package org.apache.inlong.manager.service.workflow.consumption.listener;
+import java.util.Date;
+import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
import org.apache.inlong.manager.workflow.model.WorkflowContext;
-
-import java.util.Date;
-
-import lombok.extern.slf4j.Slf4j;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Added data consumption process cancellation event listener
- *
*/
@Slf4j
@Component
public class ConsumptionCancelProcessListener implements ProcessEventListener {
- private ConsumptionEntityMapper consumptionEntityMapper;
+ private final ConsumptionEntityMapper consumptionEntityMapper;
@Autowired
public ConsumptionCancelProcessListener(ConsumptionEntityMapper consumptionEntityMapper) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
new file mode 100644
index 0000000..9f74716
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.workflow.consumption.listener;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
+import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.service.thirdpart.mq.PulsarOptService;
+import org.apache.inlong.manager.service.thirdpart.mq.TubeMqOptService;
+import org.apache.inlong.manager.service.thirdpart.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
+import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * Added data consumption process complete archive event listener
+ */
+@Slf4j
+@Component
+public class ConsumptionCompleteProcessListener implements ProcessEventListener {
+
+ @Autowired
+ private PulsarOptService pulsarMqOptService;
+ @Autowired
+ private ClusterBean clusterBean;
+ @Autowired
+ private BusinessEntityMapper businessMapper;
+ @Autowired
+ private ConsumptionEntityMapper consumptionMapper;
+ @Autowired
+ private TubeMqOptService tubeMqOptService;
+
+ @Override
+ public ProcessEvent event() {
+ return ProcessEvent.COMPLETE;
+ }
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+ NewConsumptionWorkflowForm consumptionForm = (NewConsumptionWorkflowForm) context.getProcessForm();
+
+ // Real-time query of consumption information
+ Integer consumptionId = consumptionForm.getConsumptionInfo().getId();
+ ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(consumptionId);
+ if (entity == null) {
+ throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
+ }
+
+ String middlewareType = entity.getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+ this.createTubeConsumerGroup(entity);
+ return ListenerResult.success("Create Tube consumer group successful");
+ } else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ this.createPulsarTopicMessage(entity);
+ } else {
+ throw new WorkflowListenerException("middleware type [" + middlewareType + "] not supported");
+ }
+
+ this.updateConsumerInfo(consumptionId, entity.getConsumerGroupId());
+ return ListenerResult.success("create Tube /Pulsar consumer group successful");
+ }
+
+ /**
+ * Update consumption after approve
+ */
+ private void updateConsumerInfo(Integer consumptionId, String consumerGroupId) {
+ ConsumptionEntity update = new ConsumptionEntity();
+ update.setId(consumptionId);
+ update.setStatus(ConsumptionStatus.APPROVED.getStatus());
+ update.setConsumerGroupId(consumerGroupId);
+ update.setModifyTime(new Date());
+ consumptionMapper.updateByPrimaryKeySelective(update);
+ }
+
+ /**
+ * Create Pulsar consumption information, including cross-regional cycle creation of consumption groups
+ */
+ private void createPulsarTopicMessage(ConsumptionEntity entity) {
+ String groupId = entity.getInlongGroupId();
+ BusinessEntity businessEntity = businessMapper.selectByIdentifier(groupId);
+ Preconditions.checkNotNull(businessEntity, "business not found for groupId=" + groupId);
+ String mqResourceObj = businessEntity.getMqResourceObj();
+ Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + groupId);
+
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(clusterBean.getPulsarAdminUrl())) {
+ PulsarTopicBean topicMessage = new PulsarTopicBean();
+ String tenant = clusterBean.getDefaultTenant();
+ topicMessage.setTenant(tenant);
+ topicMessage.setNamespace(mqResourceObj);
+
+ // If cross-regional replication is started, each cluster needs to create consumer groups in cycles
+ String consumerGroup = entity.getConsumerGroupId();
+ List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+ List<String> topics = Arrays.asList(entity.getTopic().split(","));
+ this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics);
+ } catch (Exception e) {
+ log.error("create pulsar topic failed", e);
+ throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
+ + e.getMessage());
+ }
+ }
+
+ private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean,
+ List<String> clusters, List<String> topics) {
+ try {
+ for (String cluster : clusters) {
+ String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(serviceUrl)) {
+ pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
+ }
+ }
+ } catch (Exception e) {
+ log.error("create pulsar consumer group failed", e);
+ throw new WorkflowListenerException("failed to create pulsar consumer group");
+ }
+ }
+
+ /**
+ * Create tube consumer group
+ */
+ private void createTubeConsumerGroup(ConsumptionEntity consumption) {
+ AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
+ addTubeConsumeGroupRequest.setClusterId(1); // TODO is cluster id needed?
+ addTubeConsumeGroupRequest.setCreateUser(consumption.getCreator());
+ AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
+ bean.setTopicName(consumption.getTopic());
+ bean.setGroupName(consumption.getConsumerGroupId());
+ addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
+
+ try {
+ tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+ } catch (Exception e) {
+ throw new WorkflowListenerException("failed to create tube consumer group: " + addTubeConsumeGroupRequest);
+ }
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
similarity index 77%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
index a9e1cbe..8159a2e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionPassTaskListener.java
@@ -15,15 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
+package org.apache.inlong.manager.service.workflow.consumption.listener;
import com.alibaba.druid.util.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionApproveForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.ConsumptionAdminApproveForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
@@ -37,7 +38,7 @@ import org.springframework.stereotype.Component;
*/
@Slf4j
@Component
-public class ConsumptionApproveTaskListener implements TaskEventListener {
+public class ConsumptionPassTaskListener implements TaskEventListener {
@Autowired
private ConsumptionService consumptionService;
@@ -50,18 +51,18 @@ public class ConsumptionApproveTaskListener implements TaskEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
NewConsumptionWorkflowForm form = (NewConsumptionWorkflowForm) context.getProcessForm();
- NewConsumptionApproveForm approveForm = (NewConsumptionApproveForm) context.getActionContext()
+ ConsumptionAdminApproveForm approveForm = (ConsumptionAdminApproveForm) context.getActionContext()
.getForm();
- if (StringUtils.equals(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getConsumerGroupId())) {
+ ConsumptionInfo info = form.getConsumptionInfo();
+ if (StringUtils.equals(approveForm.getConsumerGroupId(), info.getConsumerGroupId())) {
return ListenerResult.success("The consumer group name has not been modified");
}
- boolean exitDuplicate = this.consumptionService
- .isConsumerGroupIdExists(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getId());
- if (exitDuplicate) {
+ boolean exist = consumptionService.isConsumerGroupIdExists(approveForm.getConsumerGroupId(), info.getId());
+ if (exist) {
log.error("consumerGroupId already exist! duplicate :{}", approveForm.getConsumerGroupId());
throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_NAME_DUPLICATED);
}
- return ListenerResult.success("Consumer group name from" + form.getConsumptionInfo().getConsumerGroupId()
+ return ListenerResult.success("Consumer group name from" + info.getConsumerGroupId()
+ "change to " + approveForm.getConsumerGroupId());
}
@@ -69,4 +70,5 @@ public class ConsumptionApproveTaskListener implements TaskEventListener {
public boolean async() {
return false;
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
similarity index 94%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
index e02ce91..0bcbd9b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionRejectProcessListener.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
+package org.apache.inlong.manager.service.workflow.consumption.listener;
import java.util.Date;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
+import org.apache.inlong.manager.service.workflow.consumption.NewConsumptionWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
deleted file mode 100644
index f3221db..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newbusiness/CreateResourceWorkflowDefinition.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.newbusiness;
-
-import java.util.List;
-import java.util.stream.Collectors;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.enums.BizConstant;
-import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.dao.entity.DataStreamEntity;
-import org.apache.inlong.manager.dao.mapper.DataStreamEntityMapper;
-import org.apache.inlong.manager.service.core.StorageService;
-import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForAllStreamListener;
-import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeConsumerGroupTaskListener;
-import org.apache.inlong.manager.service.thirdpart.mq.CreateTubeTopicTaskListener;
-import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
-import org.apache.inlong.manager.service.workflow.ProcessName;
-import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CreateResourceCompleteProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.CreateResourceFailedProcessListener;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.InitBusinessInfoListener;
-import org.apache.inlong.manager.workflow.model.definition.EndEvent;
-import org.apache.inlong.manager.workflow.model.definition.Process;
-import org.apache.inlong.manager.workflow.model.definition.ServiceTask;
-import org.apache.inlong.manager.workflow.model.definition.StartEvent;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Create workflow definitions for business resources
- */
-@Slf4j
-@Component
-public class CreateResourceWorkflowDefinition implements WorkflowDefinition {
-
- @Autowired
- private InitBusinessInfoListener initBusinessInfoListener;
-
- @Autowired
- private CreateResourceCompleteProcessListener createResourceCompleteProcessListener;
-
- @Autowired
- private CreateResourceFailedProcessListener createResourceFailedProcessListener;
-
- @Autowired
- private CreateTubeTopicTaskListener createTubeTopicTaskListener;
-
- @Autowired
- private CreateTubeConsumerGroupTaskListener createTubeConsumerGroupTaskListener;
-
- @Autowired
- private CreateHiveTableForAllStreamListener createHiveTableForAllStreamListener;
-
- @Autowired
- private PushHiveConfigTaskListener pushHiveConfigTaskListener;
-
- @Autowired
- private StorageService storageService;
-
- @Autowired
- private DataStreamEntityMapper streamMapper;
-
- @Override
- public Process defineProcess() {
-
- // Configuration process
- Process process = new Process();
- process.addListener(initBusinessInfoListener);
- process.addListener(createResourceCompleteProcessListener);
- process.addListener(createResourceFailedProcessListener);
-
- process.setType("Business Resource Creation");
- process.setName(getProcessName().name());
- process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(CreateResourceWorkflowForm.class);
- process.setVersion(1);
- process.setHidden(true);
-
- // Start node
- StartEvent startEvent = new StartEvent();
- process.setStartEvent(startEvent);
-
- // End node
- EndEvent endEvent = new EndEvent();
- process.setEndEvent(endEvent);
-
- ServiceTask createTubeTopicTask = new ServiceTask();
- createTubeTopicTask.setSkipResolver(c -> {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
- BusinessInfo businessInfo = form.getBusinessInfo();
- if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
- return false;
- }
- log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
- businessInfo.getMiddlewareType(), form.getInlongGroupId());
- return true;
- });
- createTubeTopicTask.setName("createTubeTopic");
- createTubeTopicTask.setDisplayName("Create Tube Topic");
- createTubeTopicTask.addListener(createTubeTopicTaskListener);
- process.addTask(createTubeTopicTask);
-
- ServiceTask createConsumerGroupForSortTask = new ServiceTask();
- createConsumerGroupForSortTask.setSkipResolver(c -> {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
- BusinessInfo businessInfo = form.getBusinessInfo();
- if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(businessInfo.getMiddlewareType())) {
- return false;
- }
- log.warn("no need to create tube resource for groupId={}, as the middleware type is {}",
- form.getInlongGroupId(), businessInfo.getMiddlewareType());
- return true;
- });
- createConsumerGroupForSortTask.setName("createConsumerGroupForSort");
- createConsumerGroupForSortTask.setDisplayName("Create Consumer Group For Sort");
- createConsumerGroupForSortTask.addListener(createTubeConsumerGroupTaskListener);
- process.addTask(createConsumerGroupForSortTask);
-
- ServiceTask createHiveTablesTask = new ServiceTask();
- createHiveTablesTask.setSkipResolver(c -> {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
- List<String> dsForHive = storageService
- .filterStreamIdByStorageType(form.getInlongGroupId(), BizConstant.STORAGE_HIVE,
- streamMapper
- .selectByGroupId(form.getInlongGroupId())
- .stream()
- .map(DataStreamEntity::getInlongStreamId)
- .collect(Collectors.toList()));
- if (CollectionUtils.isEmpty(dsForHive)) {
- log.warn("business {} dataStream {} does not have storage, skip to create hive table ",
- form.getInlongGroupId(), form.getInlongStreamId());
- return true;
- }
- return false;
- });
- createHiveTablesTask.setName("createHiveTableTask");
- createHiveTablesTask.setDisplayName("Create Hive Table");
- createHiveTablesTask.addListener(createHiveTableForAllStreamListener);
- process.addTask(createHiveTablesTask);
-
- ServiceTask pushSortConfig = new ServiceTask();
- pushSortConfig.setName("pushSortConfig");
- pushSortConfig.setDisplayName("Push Sort Config");
- pushSortConfig.addListener(pushHiveConfigTaskListener);
- process.addTask(pushSortConfig);
-
- startEvent.addNext(createTubeTopicTask);
- createTubeTopicTask.addNext(createConsumerGroupForSortTask);
- createConsumerGroupForSortTask.addNext(createHiveTablesTask);
- createHiveTablesTask.addNext(pushSortConfig);
- pushSortConfig.addNext(endEvent);
-
- return process;
- }
-
- @Override
- public ProcessName getProcessName() {
- return ProcessName.CREATE_BUSINESS_RESOURCE;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
deleted file mode 100644
index 4ea6073..0000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.workflow.newconsumption.listener;
-
-import java.util.Collections;
-import java.util.Date;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.BizConstant;
-import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.thirdpart.mq.TubeMqOptService;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionApproveForm;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newconsumption.NewConsumptionWorkflowForm;
-import org.apache.inlong.manager.workflow.core.QueryService;
-import org.apache.inlong.manager.workflow.core.event.ListenerResult;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
-import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
-import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
-import org.apache.inlong.manager.workflow.model.WorkflowContext;
-import org.apache.inlong.manager.workflow.model.definition.Process;
-import org.apache.inlong.manager.workflow.model.instance.TaskInstance;
-import org.apache.inlong.manager.workflow.model.view.TaskQuery;
-import org.apache.inlong.manager.workflow.util.WorkflowFormParserUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-/**
- * Added data consumption process complete archive event listener
- */
-@Slf4j
-@Component
-public class ConsumptionCompleteProcessListener implements ProcessEventListener {
-
- @Autowired
- private QueryService queryService;
-
- @Autowired
- private ConsumptionEntityMapper consumptionEntityMapper;
-
- @Autowired
- private ClusterInfoMapper clusterInfoMapper;
-
- @Autowired
- private TubeMqOptService tubeMqOptService;
-
- @Override
- public ProcessEvent event() {
- return ProcessEvent.COMPLETE;
- }
-
- @Override
- public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
-
- NewConsumptionWorkflowForm workflowForm = (NewConsumptionWorkflowForm) context.getProcessForm();
- NewConsumptionApproveForm adminApproveForm = getAdminApproveForm(context);
-
- workflowForm.getConsumptionInfo().setConsumerGroupId(adminApproveForm.getConsumerGroupId());
- updateConsumerInfo(workflowForm.getConsumptionInfo().getId(), adminApproveForm.getConsumerGroupId());
-
- String middlewareType = workflowForm.getConsumptionInfo().getMiddlewareType();
-
- if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
- createTubeConsumerGroup(workflowForm.getConsumptionInfo());
- return ListenerResult.success("Create Tube Consumer Group");
- }
-
- throw new BusinessException(BizErrorCodeEnum.INVALID_PARAMETER,
- "Middleware type [" + middlewareType + "] not support");
-
- }
-
- private NewConsumptionApproveForm getAdminApproveForm(WorkflowContext context) {
- TaskInstance adminTask = queryService.listTask(TaskQuery.builder()
- .processInstId(context.getProcessInstance().getId())
- .name(NewConsumptionWorkflowDefinition.UT_ADMINT_NAME)
- .build())
- .stream()
- .findFirst()
- .orElseThrow(() -> new BusinessException(BizErrorCodeEnum.WORKFLOW_EXE_FAILED,
- "workflow err,not found task " + NewConsumptionWorkflowDefinition.UT_ADMINT_NAME));
-
- Process process = context.getProcess();
- NewConsumptionApproveForm form = WorkflowFormParserUtils.parseTaskForm(adminTask, process);
- Preconditions.checkNotNull(form, "form cannot be null");
- return form;
- }
-
- private void updateConsumerInfo(Integer consumerId, String consumerGroupId) {
- ConsumptionEntity update = new ConsumptionEntity();
- update.setId(consumerId);
- update.setStatus(ConsumptionStatus.APPROVED.getStatus());
- update.setConsumerGroupId(consumerGroupId);
- update.setModifyTime(new Date());
- consumptionEntityMapper.updateByPrimaryKeySelective(update);
- }
-
- private void createTubeConsumerGroup(ConsumptionInfo consumptionInfo) {
- AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
- addTubeConsumeGroupRequest.setClusterId(1); // TODO is cluster id needed?
- addTubeConsumeGroupRequest.setCreateUser(consumptionInfo.getCreator());
- AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
- bean.setTopicName(consumptionInfo.getTopic());
- bean.setGroupName(consumptionInfo.getConsumerGroupId());
- addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
-
- try {
- tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
- } catch (BusinessException e) {
- throw e;
- } catch (Exception e) {
- throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED);
- }
- }
-
- @Override
- public boolean async() {
- return false;
- }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
similarity index 51%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 3b16958..ff201ff 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
import java.util.Collections;
import java.util.List;
@@ -23,12 +23,14 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.service.core.StorageService;
-import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForOneStreamListener;
+import org.apache.inlong.manager.service.thirdpart.hive.CreateHiveTableForStreamListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarGroupForStreamTaskListener;
+import org.apache.inlong.manager.service.thirdpart.mq.CreatePulsarTopicForStreamTaskListener;
import org.apache.inlong.manager.service.thirdpart.sort.PushHiveConfigTaskListener;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowDefinition;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
-import org.apache.inlong.manager.service.workflow.newbusiness.listener.InitBusinessInfoListener;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.listener.InitBusinessInfoListener;
import org.apache.inlong.manager.workflow.model.definition.EndEvent;
import org.apache.inlong.manager.workflow.model.definition.Process;
import org.apache.inlong.manager.workflow.model.definition.ServiceTask;
@@ -37,37 +39,41 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Single data stream access resource creation
+ * Data stream access resource creation
*/
@Component
@Slf4j
-public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
+public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
@Autowired
private StorageService storageService;
@Autowired
private InitBusinessInfoListener initBusinessInfoListener;
@Autowired
- private SingleStreamFailedProcessListener singleStreamFailedProcessListener;
+ private StreamFailedProcessListener streamFailedProcessListener;
@Autowired
- private SingleStreamCompleteProcessListener singleStreamCompleteProcessListener;
+ private StreamCompleteProcessListener streamCompleteProcessListener;
@Autowired
- private CreateHiveTableForOneStreamListener createHiveTableForOneStreamListener;
+ private CreateHiveTableForStreamListener createHiveTableListener;
@Autowired
private PushHiveConfigTaskListener pushHiveConfigTaskListener;
+ @Autowired
+ private CreatePulsarTopicForStreamTaskListener createPulsarTopicTaskListener;
+ @Autowired
+ private CreatePulsarGroupForStreamTaskListener createPulsarGroupTaskListener;
@Override
public Process defineProcess() {
// Configuration process
Process process = new Process();
process.addListener(initBusinessInfoListener);
- process.addListener(singleStreamFailedProcessListener);
- process.addListener(singleStreamCompleteProcessListener);
+ process.addListener(streamFailedProcessListener);
+ process.addListener(streamCompleteProcessListener);
process.setType("Data stream access resource creation");
process.setName(getProcessName().name());
process.setDisplayName(getProcessName().getDisplayName());
- process.setFormClass(CreateResourceWorkflowForm.class);
+ process.setFormClass(BusinessResourceWorkflowForm.class);
process.setVersion(1);
process.setHidden(true);
@@ -79,9 +85,41 @@ public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
EndEvent endEvent = new EndEvent();
process.setEndEvent(endEvent);
+ ServiceTask createPulsarTopicTask = new ServiceTask();
+ createPulsarTopicTask.setSkipResolver(c -> {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+ String middlewareType = form.getBusinessInfo().getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ return false;
+ }
+ log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}",
+ form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+ return true;
+ });
+ createPulsarTopicTask.setName("createPulsarTopic");
+ createPulsarTopicTask.setDisplayName("Stream-CreatePulsarTopic");
+ createPulsarTopicTask.addListener(createPulsarTopicTaskListener);
+ process.addTask(createPulsarTopicTask);
+
+ ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask();
+ createPulsarSubscriptionGroupTask.setSkipResolver(c -> {
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
+ String middlewareType = form.getBusinessInfo().getMiddlewareType();
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ return false;
+ }
+ log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}",
+ form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+ return true;
+ });
+ createPulsarSubscriptionGroupTask.setName("createPulsarSubscription");
+ createPulsarSubscriptionGroupTask.setDisplayName("Stream-CreatePulsarSubscription");
+ createPulsarSubscriptionGroupTask.addListener(createPulsarGroupTaskListener);
+ process.addTask(createPulsarSubscriptionGroupTask);
+
ServiceTask createHiveTableTask = new ServiceTask();
createHiveTableTask.setSkipResolver(c -> {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) c.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) c.getProcessForm();
String groupId = form.getInlongGroupId();
String streamId = form.getInlongStreamId();
List<String> dsForHive = storageService.filterStreamIdByStorageType(groupId, BizConstant.STORAGE_HIVE,
@@ -94,18 +132,20 @@ public class SingleStreamWorkflowDefinition implements WorkflowDefinition {
return false;
});
- createHiveTableTask.setName("createHiveTableTask");
- createHiveTableTask.setDisplayName("Create Hive Table");
- createHiveTableTask.addListener(createHiveTableForOneStreamListener);
+ createHiveTableTask.setName("createHiveTable");
+ createHiveTableTask.setDisplayName("Stream-CreateHiveTable");
+ createHiveTableTask.addListener(createHiveTableListener);
process.addTask(createHiveTableTask);
ServiceTask pushSortConfig = new ServiceTask();
pushSortConfig.setName("pushSortConfig");
- pushSortConfig.setDisplayName("Push Sort Configuration");
+ pushSortConfig.setDisplayName("Stream-PushSortConfig");
pushSortConfig.addListener(pushHiveConfigTaskListener);
process.addTask(pushSortConfig);
- startEvent.addNext(createHiveTableTask);
+ startEvent.addNext(createPulsarTopicTask);
+ createPulsarTopicTask.addNext(createPulsarSubscriptionGroupTask);
+ createPulsarSubscriptionGroupTask.addNext(createHiveTableTask);
createHiveTableTask.addNext(pushSortConfig);
pushSortConfig.addNext(endEvent);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/InitBusinessInfoForStreamListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/InitBusinessInfoForStreamListener.java
similarity index 90%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/InitBusinessInfoForStreamListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/InitBusinessInfoForStreamListener.java
index b958a7f..1975689 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/InitBusinessInfoForStreamListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/InitBusinessInfoForStreamListener.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.service.core.BusinessService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -46,7 +46,7 @@ public class InitBusinessInfoForStreamListener implements ProcessEventListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
BusinessInfo businessInfo = businessService.get(context.getProcessForm().getInlongGroupId());
if (businessInfo != null) {
form.setBusinessInfo(businessInfo);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
index 0e9f89d..08cdff8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamCompleteProcessListener.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Event listener for completed creation of single data stream resource
+ * Event listener for completed creation of data stream resource
*/
@Slf4j
@Component
-public class SingleStreamCompleteProcessListener implements ProcessEventListener {
+public class StreamCompleteProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
@@ -53,7 +53,7 @@ public class SingleStreamCompleteProcessListener implements ProcessEventListener
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
String streamId = form.getInlongStreamId();
String username = context.getApplicant();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamFailedProcessListener.java
similarity index 86%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamFailedProcessListener.java
index 6ea3f14..fe0df90 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newstream/SingleStreamFailedProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/StreamFailedProcessListener.java
@@ -15,13 +15,13 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.workflow.newstream;
+package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.DataStreamService;
-import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.service.workflow.business.BusinessResourceWorkflowForm;
import org.apache.inlong.manager.workflow.core.event.ListenerResult;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEvent;
import org.apache.inlong.manager.workflow.core.event.process.ProcessEventListener;
@@ -31,11 +31,11 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
- * Event listener for failed creation of single data stream resource
+ * Event listener for failed creation of data stream resource
*/
@Slf4j
@Component
-public class SingleStreamFailedProcessListener implements ProcessEventListener {
+public class StreamFailedProcessListener implements ProcessEventListener {
@Autowired
private BusinessService businessService;
@@ -53,7 +53,7 @@ public class SingleStreamFailedProcessListener implements ProcessEventListener {
*/
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
- CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ BusinessResourceWorkflowForm form = (BusinessResourceWorkflowForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
String streamId = form.getInlongStreamId();
String username = context.getApplicant();
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 4ca899c..7ae7474 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -23,7 +23,7 @@ logging.level.org.apache.inlong.manager=debug
spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
spring.datasource.druid.username=root
-spring.datasource.druid.password=fighting
+spring.datasource.druid.password=inlong
spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.druid.validationQuery=SELECT 'x'
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 056e5a6..af253e5 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -21,8 +21,8 @@
logging.level.root=INFO
spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
-spring.datasource.druid.username=xxxxxx
-spring.datasource.druid.password=xxxxxx
+spring.datasource.druid.username=root
+spring.datasource.druid.password=inlong
spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.druid.validationQuery=SELECT 'x'
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index b4a75c9..7ae7474 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -22,8 +22,8 @@ logging.level.root=INFO
logging.level.org.apache.inlong.manager=debug
spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
-spring.datasource.druid.username=xxxxxx
-spring.datasource.druid.password=xxxxxx
+spring.datasource.druid.username=root
+spring.datasource.druid.password=inlong
spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.druid.validationQuery=SELECT 'x'
diff --git a/inlong-manager/sql/apache_inlong_manager.sql b/inlong-manager/sql/apache_inlong_manager.sql
index a31f22f..6b3c543 100644
--- a/inlong-manager/sql/apache_inlong_manager.sql
+++ b/inlong-manager/sql/apache_inlong_manager.sql
@@ -79,27 +79,29 @@ CREATE TABLE `agent_sys_conf`
DROP TABLE IF EXISTS `business`;
CREATE TABLE `business`
(
- `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
- `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
- `name` varchar(128) DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
- `cn_name` varchar(256) DEFAULT NULL COMMENT 'Chinese display name',
- `description` varchar(256) DEFAULT '' COMMENT 'Business Introduction',
- `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
- `mq_resource_obj` varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
- `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
- `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
- `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
- `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
- `schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
- `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
- `status` int(4) DEFAULT '21' COMMENT 'Business status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+ `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+ `inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
+ `name` varchar(128) DEFAULT '' COMMENT 'Business name, English, numbers and underscore',
+ `cn_name` varchar(256) DEFAULT NULL COMMENT 'Chinese display name',
+ `description` varchar(256) DEFAULT '' COMMENT 'Business Introduction',
+ `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+ `queue_module` VARCHAR(20) NULL DEFAULT 'parallel' COMMENT 'Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order messages; serial: single partition, low throughput, and orderly messages',
+ `topic_partition_num` INT(4) NULL DEFAULT '3' COMMENT 'The number of partitions of Pulsar Topic, 1-20',
+ `mq_resource_obj` varchar(128) NOT NULL COMMENT 'MQ resource object, for Tube, its Topic, for Pulsar, its Namespace',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+ `schema_name` varchar(128) DEFAULT NULL COMMENT 'Data type, associated data_schema table',
+ `in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+ `followers` varchar(512) DEFAULT NULL COMMENT 'List of names of business followers, separated by commas',
+ `status` int(4) DEFAULT '21' COMMENT 'Business status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) NOT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_business` (`inlong_group_id`, `is_deleted`, `modify_time`)
) ENGINE = InnoDB
@@ -113,18 +115,18 @@ CREATE TABLE `business_pulsar`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id, filled in by the user, undeleted ones cannot be repeated',
- `ensemble` int(3) DEFAULT '3' COMMENT 'The writable nodes number of ledger',
- `write_quorum` int(3) DEFAULT '3' COMMENT 'The copies number of ledger',
- `ack_quorum` int(3) DEFAULT '2' COMMENT 'The number of requested acks',
- `retention_time` int(11) DEFAULT '72' COMMENT 'Message storage time',
- `retention_time_unit` char(20) DEFAULT 'hours' COMMENT 'The unit of the message storage time',
- `ttl` int(11) DEFAULT '24' COMMENT 'Message time-to-live duration',
- `ttl_unit` varchar(20) DEFAULT 'hours' COMMENT 'The unit of time-to-live duration',
- `retention_size` int(11) DEFAULT '-1' COMMENT 'Message size',
- `retention_size_unit` varchar(20) DEFAULT 'MB' COMMENT 'The unit of message size',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `ensemble` int(3) DEFAULT '3' COMMENT 'The writable nodes number of ledger',
+ `write_quorum` int(3) DEFAULT '3' COMMENT 'The copies number of ledger',
+ `ack_quorum` int(3) DEFAULT '2' COMMENT 'The number of requested acks',
+ `retention_time` int(11) DEFAULT '72' COMMENT 'Message storage time',
+ `retention_time_unit` char(20) DEFAULT 'hours' COMMENT 'The unit of the message storage time',
+ `ttl` int(11) DEFAULT '24' COMMENT 'Message time-to-live duration',
+ `ttl_unit` varchar(20) DEFAULT 'hours' COMMENT 'The unit of time-to-live duration',
+ `retention_size` int(11) DEFAULT '-1' COMMENT 'Message size',
+ `retention_size_unit` varchar(20) DEFAULT 'MB' COMMENT 'The unit of message size',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar info table';
@@ -138,9 +140,9 @@ CREATE TABLE `business_ext`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id',
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
- `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_group_id` (`inlong_group_id`)
) ENGINE = InnoDB
@@ -158,15 +160,15 @@ CREATE TABLE `cluster_info`
`ip` varchar(64) NOT NULL COMMENT 'Cluster IP address',
`port` int(11) NOT NULL COMMENT 'Cluster port',
`in_charges` varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
- `url` varchar(256) DEFAULT NULL COMMENT 'Cluster URL address',
- `is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
- `ext_props` json DEFAULT NULL COMMENT 'extended properties',
- `status` int(4) DEFAULT '1' COMMENT 'cluster status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `url` varchar(256) DEFAULT NULL COMMENT 'Cluster URL address',
+ `is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
+ `ext_props` json DEFAULT NULL COMMENT 'extended properties',
+ `status` int(4) DEFAULT '1' COMMENT 'cluster status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Cluster Information Table';
@@ -180,29 +182,29 @@ CREATE TABLE `common_db_server`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`access_type` varchar(20) NOT NULL COMMENT 'Collection type, with Agent, DataProxy client, LoadProxy',
`connection_name` varchar(128) NOT NULL COMMENT 'The name of the database connection',
- `db_type` varchar(128) DEFAULT 'MySQL' COMMENT 'DB type, such as MySQL, Oracle',
+ `db_type` varchar(128) DEFAULT 'MySQL' COMMENT 'DB type, such as MySQL, Oracle',
`db_server_ip` varchar(64) NOT NULL COMMENT 'DB Server IP',
`port` int(11) NOT NULL COMMENT 'Port number',
- `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
+ `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
`username` varchar(64) NOT NULL COMMENT 'Username',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
- `has_select` tinyint(1) DEFAULT '0' COMMENT 'Is there DB permission select, 0: No, 1: Yes',
- `has_insert` tinyint(1) DEFAULT '0' COMMENT 'Is there DB permission to insert, 0: No, 1: Yes',
- `has_update` tinyint(1) DEFAULT '0' COMMENT 'Is there a DB permission update, 0: No, 1: Yes',
- `has_delete` tinyint(1) DEFAULT '0' COMMENT 'Is there a DB permission to delete, 0: No, 1: Yes',
+ `has_select` tinyint(1) DEFAULT '0' COMMENT 'Is there DB permission select, 0: No, 1: Yes',
+ `has_insert` tinyint(1) DEFAULT '0' COMMENT 'Is there DB permission to insert, 0: No, 1: Yes',
+ `has_update` tinyint(1) DEFAULT '0' COMMENT 'Is there a DB permission update, 0: No, 1: Yes',
+ `has_delete` tinyint(1) DEFAULT '0' COMMENT 'Is there a DB permission to delete, 0: No, 1: Yes',
`in_charges` varchar(512) NOT NULL COMMENT 'DB person in charge, separated by a comma when there are multiple ones',
- `is_region_id` tinyint(1) DEFAULT '0' COMMENT 'Whether it contains a region ID, 0: No, 1: Yes',
- `db_description` varchar(256) DEFAULT NULL COMMENT 'DB description',
- `backup_db_server_ip` varchar(64) DEFAULT NULL COMMENT 'Backup DB HOST',
- `backup_db_port` int(11) DEFAULT NULL COMMENT 'Backup DB port',
- `status` int(4) DEFAULT '0' COMMENT 'status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `is_region_id` tinyint(1) DEFAULT '0' COMMENT 'Whether it contains a region ID, 0: No, 1: Yes',
+ `db_description` varchar(256) DEFAULT NULL COMMENT 'DB description',
+ `backup_db_server_ip` varchar(64) DEFAULT NULL COMMENT 'Backup DB HOST',
+ `backup_db_port` int(11) DEFAULT NULL COMMENT 'Backup DB port',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `visible_person` varchar(1024) DEFAULT NULL COMMENT 'List of visible persons, separated by commas',
- `visible_group` varchar(1024) DEFAULT NULL COMMENT 'List of visible groups, separated by commas',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `visible_person` varchar(1024) DEFAULT NULL COMMENT 'List of visible persons, separated by commas',
+ `visible_group` varchar(1024) DEFAULT NULL COMMENT 'List of visible groups, separated by commas',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Common Database Server Table';
@@ -244,16 +246,16 @@ CREATE TABLE `consumption`
`consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
`in_charges` varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
`inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group id',
- `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+ `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
`topic` varchar(255) NOT NULL COMMENT 'Consumption topic',
- `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
- `inlong_stream_id` varchar(1024) DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
+ `filter_enabled` int(2) DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
+ `inlong_stream_id` varchar(1024) DEFAULT NULL COMMENT 'Data stream ID for consumption, if filter_enable is 1, it cannot empty',
`status` int(4) NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'creator',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
@@ -286,20 +288,20 @@ CREATE TABLE `data_proxy_cluster`
(
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`name` varchar(128) NOT NULL COMMENT 'cluster name',
- `description` varchar(500) DEFAULT NULL COMMENT 'cluster description',
+ `description` varchar(500) DEFAULT NULL COMMENT 'cluster description',
`address` varchar(128) NOT NULL COMMENT 'cluster address',
- `port` varchar(256) DEFAULT '46801' COMMENT 'Access port number, multiple ports are separated by a comma',
- `is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
- `is_inner_ip` tinyint(1) DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
- `net_type` varchar(20) DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
- `in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
- `ext_props` json DEFAULT NULL COMMENT 'Extended properties',
- `status` int(4) DEFAULT '1' COMMENT 'Cluster status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `port` varchar(256) DEFAULT '46801' COMMENT 'Access port number, multiple ports are separated by a comma',
+ `is_backup` tinyint(1) DEFAULT '0' COMMENT 'Whether it is a backup cluster, 0: no, 1: yes',
+ `is_inner_ip` tinyint(1) DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
+ `net_type` varchar(20) DEFAULT NULL COMMENT 'Cluster network type, internal, or public',
+ `in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
+ `ext_props` json DEFAULT NULL COMMENT 'Extended properties',
+ `status` int(4) DEFAULT '1' COMMENT 'Cluster status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='DataProxy cluster table';
@@ -354,28 +356,28 @@ CREATE TABLE `data_stream`
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Data stream id, non-deleted globally unique',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`name` varchar(64) NOT NULL COMMENT 'The name of the data stream page display, can be Chinese',
- `description` varchar(256) DEFAULT '' COMMENT 'Introduction to data stream',
- `mq_resource_obj` varchar(128) DEFAULT NULL COMMENT 'MQ resource object, in the data stream, Tube is data_stream_id, Pulsar is Topic',
- `data_source_type` varchar(32) DEFAULT 'FILE' COMMENT 'Data source type, including: FILE, DB, Auto-Push (DATA_PROXY_SDK, HTTP)',
- `storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
- `data_type` varchar(20) DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
- `data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
- `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
- `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
- `have_predefined_fields` tinyint(1) DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
- `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
- `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
- `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
- `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
- `in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
- `status` int(4) DEFAULT '0' COMMENT 'Data stream status',
- `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+ `description` varchar(256) DEFAULT '' COMMENT 'Introduction to data stream',
+ `mq_resource_obj` varchar(128) DEFAULT NULL COMMENT 'MQ resource object, in the data stream, Tube is data_stream_id, Pulsar is Topic',
+ `data_source_type` varchar(32) DEFAULT 'FILE' COMMENT 'Data source type, including: FILE, DB, Auto-Push (DATA_PROXY_SDK, HTTP)',
+ `storage_period` int(11) DEFAULT '1' COMMENT 'The storage period of data in MQ, unit: day',
+ `data_type` varchar(20) DEFAULT 'TEXT' COMMENT 'Data type, there are: TEXT, KEY-VALUE, PB, BON, TEXT and BON should be treated differently',
+ `data_encoding` varchar(8) DEFAULT 'UTF-8' COMMENT 'Data encoding format, including: UTF-8, GBK',
+ `data_separator` varchar(8) DEFAULT NULL COMMENT 'The source data field separator, stored as ASCII code',
+ `data_escape_char` varchar(8) DEFAULT NULL COMMENT 'Source data field escape character, the default is NULL (NULL), stored as 1 character',
+ `have_predefined_fields` tinyint(1) DEFAULT '0' COMMENT '(File, DB access) whether there are predefined fields, 0: none, 1: yes (save to data_stream_field)',
+ `daily_records` int(11) DEFAULT '10' COMMENT 'Number of access records per day, unit: 10,000 records per day',
+ `daily_storage` int(11) DEFAULT '10' COMMENT 'Access size by day, unit: GB per day',
+ `peak_records` int(11) DEFAULT '1000' COMMENT 'Access peak per second, unit: records per second',
+ `max_length` int(11) DEFAULT '10240' COMMENT 'The maximum length of a single piece of data, unit: Byte',
+ `in_charges` varchar(512) DEFAULT NULL COMMENT 'Name of responsible person, separated by commas',
+ `status` int(4) DEFAULT '0' COMMENT 'Data stream status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'Creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_data_stream` (`inlong_stream_id`, `inlong_group_id`, `is_deleted`, `modify_time`)
) ENGINE = InnoDB
@@ -391,9 +393,9 @@ CREATE TABLE `data_stream_ext`
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
- `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `key_value` varchar(256) DEFAULT NULL COMMENT 'The value of the configuration item',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_stream_id` (`inlong_stream_id`)
) ENGINE = InnoDB
@@ -479,13 +481,13 @@ CREATE TABLE `source_db_basic`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`inlong_stream_id` varchar(256) NOT NULL COMMENT 'Owning data stream id',
- `sync_type` tinyint(1) DEFAULT '0' COMMENT 'Data synchronization type, 0: FULL, full amount, 1: INCREMENTAL, incremental',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `sync_type` tinyint(1) DEFAULT '0' COMMENT 'Data synchronization type, 0: FULL, full amount, 1: INCREMENTAL, incremental',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save intermediate data that has not been submitted or approved after modification',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Basic configuration of DB data source';
@@ -500,21 +502,21 @@ CREATE TABLE `source_db_detail`
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
`access_type` varchar(20) NOT NULL COMMENT 'Collection type, with Agent, DataProxy client, LoadProxy',
- `db_name` varchar(128) DEFAULT NULL COMMENT 'database name',
- `transfer_ip` varchar(64) DEFAULT NULL COMMENT 'Transfer IP',
- `connection_name` varchar(128) DEFAULT NULL COMMENT 'The name of the database connection',
- `table_name` varchar(128) DEFAULT NULL COMMENT 'Data table name, required for increment',
+ `db_name` varchar(128) DEFAULT NULL COMMENT 'database name',
+ `transfer_ip` varchar(64) DEFAULT NULL COMMENT 'Transfer IP',
+ `connection_name` varchar(128) DEFAULT NULL COMMENT 'The name of the database connection',
+ `table_name` varchar(128) DEFAULT NULL COMMENT 'Data table name, required for increment',
`table_fields` longtext COMMENT 'Data table fields, multiple are separated by half-width commas, required for increment',
`data_sql` longtext COMMENT 'SQL statement to collect source data, required for full amount',
- `crontab` varchar(56) DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
- `status` int(4) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `crontab` varchar(56) DEFAULT NULL COMMENT 'Timed scheduling expression, required for full amount',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='DB data source details table';
@@ -528,19 +530,19 @@ CREATE TABLE `source_file_basic`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Business group id',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Data stream id',
- `is_hybrid_source` tinyint(1) DEFAULT '0' COMMENT 'Whether to mix data sources',
- `is_table_mapping` tinyint(1) DEFAULT '0' COMMENT 'Is there a table name mapping',
- `date_offset` int(4) DEFAULT '0' COMMENT 'Time offset\n',
- `date_offset_unit` varchar(2) DEFAULT 'H' COMMENT 'Time offset unit',
- `file_rolling_type` varchar(2) DEFAULT 'H' COMMENT 'File rolling type',
- `upload_max_size` int(4) DEFAULT '120' COMMENT 'Upload maximum size',
- `need_compress` tinyint(1) DEFAULT '0' COMMENT 'Whether need compress',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Delete switch',
+ `is_hybrid_source` tinyint(1) DEFAULT '0' COMMENT 'Whether to mix data sources',
+ `is_table_mapping` tinyint(1) DEFAULT '0' COMMENT 'Is there a table name mapping',
+ `date_offset` int(4) DEFAULT '0' COMMENT 'Time offset\n',
+ `date_offset_unit` varchar(2) DEFAULT 'H' COMMENT 'Time offset unit',
+ `file_rolling_type` varchar(2) DEFAULT 'H' COMMENT 'File rolling type',
+ `upload_max_size` int(4) DEFAULT '120' COMMENT 'Upload maximum size',
+ `need_compress` tinyint(1) DEFAULT '0' COMMENT 'Whether need compress',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Delete switch',
`creator` varchar(64) NOT NULL COMMENT 'Creator',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'temp view',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'temp view',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='basic configuration of file data source';
@@ -554,23 +556,23 @@ CREATE TABLE `source_file_detail`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
- `access_type` varchar(20) DEFAULT 'Agent' COMMENT 'Collection type, there are Agent, DataProxy client, LoadProxy, the file can only be Agent temporarily',
+ `access_type` varchar(20) DEFAULT 'Agent' COMMENT 'Collection type, there are Agent, DataProxy client, LoadProxy, the file can only be Agent temporarily',
`server_name` varchar(64) NOT NULL COMMENT 'The name of the data source service. If it is empty, add configuration through the following fields',
`ip` varchar(128) NOT NULL COMMENT 'Data source IP address',
`port` int(11) NOT NULL COMMENT 'Data source port number',
- `is_inner_ip` tinyint(1) DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
- `issue_type` varchar(10) DEFAULT 'SSH' COMMENT 'Issuing method, there are SSH, TCS',
+ `is_inner_ip` tinyint(1) DEFAULT '0' COMMENT 'Whether it is intranet, 0: no, 1: yes',
+ `issue_type` varchar(10) DEFAULT 'SSH' COMMENT 'Issuing method, there are SSH, TCS',
`username` varchar(32) NOT NULL COMMENT 'User name of the data source IP host',
`password` varchar(64) NOT NULL COMMENT 'The password corresponding to the above user name',
`file_path` varchar(256) NOT NULL COMMENT 'File path, supports regular matching',
- `status` int(4) DEFAULT '0' COMMENT 'Data source status',
- `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `status` int(4) DEFAULT '0' COMMENT 'Data source status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'Modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Detailed table of file data source';
@@ -602,31 +604,31 @@ CREATE TABLE `storage_hive`
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
`inlong_group_id` varchar(128) NOT NULL COMMENT 'Owning business group id',
`inlong_stream_id` varchar(128) NOT NULL COMMENT 'Owning data stream id',
- `jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
- `username` varchar(128) DEFAULT NULL COMMENT 'Username',
- `password` varchar(255) DEFAULT NULL COMMENT 'User password',
- `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
- `table_name` varchar(128) DEFAULT NULL COMMENT 'Target data table name',
- `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
- `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
- `partition_interval` int(5) DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
- `partition_unit` varchar(10) DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
- `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
- `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
- `partition_creation_strategy` varchar(50) DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
- `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
- `data_encoding` varchar(20) DEFAULT 'UTF-8' COMMENT 'data encoding type',
- `data_separator` varchar(10) DEFAULT NULL COMMENT 'data field separator',
- `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
- `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
- `status` int(4) DEFAULT '0' COMMENT 'status',
- `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `creator` varchar(64) DEFAULT NULL COMMENT 'creator name',
- `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
- `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
+ `jdbc_url` varchar(255) DEFAULT NULL COMMENT 'Hive JDBC connection URL, such as "jdbc:hive2://127.0.0.1:10000"',
+ `username` varchar(128) DEFAULT NULL COMMENT 'Username',
+ `password` varchar(255) DEFAULT NULL COMMENT 'User password',
+ `db_name` varchar(128) DEFAULT NULL COMMENT 'Target database name',
+ `table_name` varchar(128) DEFAULT NULL COMMENT 'Target data table name',
+ `hdfs_default_fs` varchar(255) DEFAULT NULL COMMENT 'HDFS defaultFS, such as "hdfs://127.0.0.1:9000"',
+ `warehouse_dir` varchar(250) DEFAULT '/user/hive/warehouse' COMMENT 'Hive table storage path on HDFS, such as "/user/hive/warehouse"',
+ `partition_interval` int(5) DEFAULT NULL COMMENT 'Partition interval, support: 1(D / H), 10 I, 30 I',
+ `partition_unit` varchar(10) DEFAULT 'D' COMMENT 'Partition type, support: D-day, H-hour, I-minute',
+ `primary_partition` varchar(255) DEFAULT 'dt' COMMENT 'primary partition field',
+ `secondary_partition` varchar(256) DEFAULT NULL COMMENT 'secondary partition field',
+ `partition_creation_strategy` varchar(50) DEFAULT 'COMPLETED' COMMENT 'Partition creation strategy, support: ARRIVED, COMPLETED',
+ `file_format` varchar(15) DEFAULT 'TextFile' COMMENT 'The stored table format, TextFile, RCFile, SequenceFile, Avro',
+ `data_encoding` varchar(20) DEFAULT 'UTF-8' COMMENT 'data encoding type',
+ `data_separator` varchar(10) DEFAULT NULL COMMENT 'data field separator',
+ `storage_period` int(5) DEFAULT '10' COMMENT 'Data storage period, unit: day',
+ `opt_log` varchar(5000) DEFAULT NULL COMMENT 'Background operation log',
+ `status` int(4) DEFAULT '0' COMMENT 'status',
+ `previous_status` int(4) DEFAULT '0' COMMENT 'Previous status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `creator` varchar(64) DEFAULT NULL COMMENT 'creator name',
+ `modifier` varchar(64) DEFAULT NULL COMMENT 'modifier name',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `temp_view` json DEFAULT NULL COMMENT 'Temporary view, used to save un-submitted and unapproved intermediate data after modification',
PRIMARY KEY (`id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4 COMMENT ='Data is stored in Hive configuration table';
@@ -893,15 +895,15 @@ CREATE TABLE `cluster_set`
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`cn_name` varchar(256) COMMENT 'Chinese display name',
`description` varchar(256) COMMENT 'ClusterSet Introduction',
- `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
+ `middleware_type` varchar(10) DEFAULT 'TUBE' COMMENT 'The middleware type of message queue, high throughput: TUBE, high consistency: PULSAR',
`in_charges` varchar(512) COMMENT 'Name of responsible person, separated by commas',
`followers` varchar(512) COMMENT 'List of names of business followers, separated by commas',
- `status` int(4) DEFAULT '21' COMMENT 'ClusterSet status',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `status` int(4) DEFAULT '21' COMMENT 'ClusterSet status',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
`creator` varchar(64) NOT NULL COMMENT 'Creator name',
`modifier` varchar(64) NULL COMMENT 'Modifier name',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
UNIQUE KEY `unique_cluster_set` (`set_name`)
) ENGINE = InnoDB
@@ -946,8 +948,8 @@ CREATE TABLE `cache_cluster_ext`
`cluster_name` varchar(128) NOT NULL COMMENT 'CacheCluster name, English, numbers and underscore',
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_cache_cluster` (`cluster_name`)
) ENGINE = InnoDB
@@ -1025,8 +1027,8 @@ CREATE TABLE `flume_source_ext`
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_flume_source_ext` (`parent_name`)
) ENGINE = InnoDB
@@ -1058,8 +1060,8 @@ CREATE TABLE `flume_channel_ext`
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_flume_channel_ext` (`parent_name`)
) ENGINE = InnoDB
@@ -1092,8 +1094,8 @@ CREATE TABLE `flume_sink_ext`
`set_name` varchar(128) NOT NULL COMMENT 'ClusterSet name, English, numbers and underscore',
`key_name` varchar(64) NOT NULL COMMENT 'Configuration item name',
`key_value` varchar(256) NULL COMMENT 'The value of the configuration item',
- `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
- `modify_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+ `is_deleted` tinyint(1) DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, 1: deleted',
+ `modify_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
PRIMARY KEY (`id`),
KEY `index_flume_sink_ext` (`parent_name`)
) ENGINE = InnoDB