You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/11/28 05:16:06 UTC
[incubator-inlong] branch master updated: [INLONG-1847][Feature][InLong-Manager] Add consumption APIs for Pulsar MQ (#1848)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 13d07a3 [INLONG-1847][Feature][InLong-Manager] Add consumption APIs for Pulsar MQ (#1848)
13d07a3 is described below
commit 13d07a30771311548ce9ce35e1aa8eb5878009ad
Author: healchow <he...@gmail.com>
AuthorDate: Sun Nov 28 13:15:58 2021 +0800
[INLONG-1847][Feature][InLong-Manager] Add consumption APIs for Pulsar MQ (#1848)
Co-authored-by: healchow <he...@gmail.com>
---
.../docker-compose/sql/apache_inlong_manager.sql | 20 ++
inlong-manager/doc/sql/apache_inlong_manager.sql | 20 ++
inlong-manager/manager-common/pom.xml | 27 +-
.../inlong/manager/common/beans/ClusterBean.java | 3 +
.../common/conversion/ConversionHandle.java | 52 +++
.../ConversionStatusContext.java} | 22 +-
.../ConversionStrategy.java} | 19 +-
.../DaysToMinute.java} | 22 +-
.../DaysToSeconds.java} | 22 +-
.../GBToMB.java} | 23 +-
.../HoursToMinute.java} | 24 +-
.../HoursToSeconds.java} | 23 +-
.../MBToMB.java} | 22 +-
.../SecondsToSeconds.java} | 22 +-
.../TBToMB.java} | 22 +-
.../inlong/manager/common/enums/BizConstant.java | 12 +
.../manager/common/enums/BizErrorCodeEnum.java | 9 +-
.../manager/common/pojo/business/BusinessInfo.java | 7 +
.../common/pojo/business/BusinessTopicVO.java | 4 +-
.../ConsumptionInfo.java | 5 +-
.../ConsumptionListVo.java | 2 +-
.../pojo/consumption/ConsumptionMqExtBase.java | 57 ++++
.../pojo/consumption/ConsumptionPulsarInfo.java | 51 +++
.../ConsumptionQuery.java | 2 +-
.../ConsumptionSummary.java | 2 +-
.../pojo/dataconsumption/ConsumerProgressInfo.java | 101 ------
.../dataconsumption/ConsumerProgressInfoQuery.java | 73 ----
.../PulsarTopicBean.java} | 21 +-
.../dao/entity/ConsumptionPulsarEntity.java} | 25 +-
.../manager/dao/entity/DataStreamEntity.java | 6 +
.../dao/mapper/ConsumptionEntityMapper.java | 5 +-
...per.java => ConsumptionPulsarEntityMapper.java} | 27 +-
.../manager/dao/mapper/DataStreamEntityMapper.java | 5 +
.../resources/mappers/ConsumptionEntityMapper.xml | 14 +-
.../mappers/ConsumptionPulsarEntityMapper.xml | 180 ++++++++++
.../resources/mappers/DataStreamEntityMapper.xml | 8 +
.../manager/service/core/ConsumptionService.java | 34 +-
.../manager/service/core/DataStreamService.java | 21 ++
.../service/core/impl/BusinessServiceImpl.java | 2 +-
.../service/core/impl/ConsumptionServiceImpl.java | 375 ++++++++++++++++-----
.../service/core/impl/DataStreamServiceImpl.java | 42 +++
.../NewConsumptionWorkflowDefinition.java | 2 +-
.../newconsumption/NewConsumptionWorkflowForm.java | 2 +-
.../listener/ConsumptionApproveTaskListener.java | 2 +-
.../ConsumptionCompleteProcessListener.java | 2 +-
.../web/controller/ConsumptionController.java | 22 +-
.../src/main/resources/application-dev.properties | 6 +-
.../src/main/resources/application-prod.properties | 2 +
.../src/main/resources/application-test.properties | 2 +
.../manager/service/core/BusinessServiceTest.java | 11 +-
.../service/core/ConsumptionServiceTest.java | 78 +++++
.../src/test/resources/application-test.properties | 2 +
.../test/resources/sql/apache_inlong_manager.sql | 19 ++
inlong-manager/pom.xml | 11 +
54 files changed, 1079 insertions(+), 515 deletions(-)
diff --git a/docker/docker-compose/sql/apache_inlong_manager.sql b/docker/docker-compose/sql/apache_inlong_manager.sql
index 6df8bdc..dec268e 100644
--- a/docker/docker-compose/sql/apache_inlong_manager.sql
+++ b/docker/docker-compose/sql/apache_inlong_manager.sql
@@ -259,6 +259,26 @@ CREATE TABLE `consumption`
DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
-- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+ `id` int NOT NULL AUTO_INCREMENT,
+ `consumption_id` int DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+ `consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
+ `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+ `inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group ID',
+ `is_rlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+ `retry_letter_topic` varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+ `is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+ `dead_letter_topic` varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+ `is_deleted` int DEFAULT '0' COMMENT 'Whether to delete',
+ PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
-- Table structure for data_proxy_cluster
-- ----------------------------
DROP TABLE IF EXISTS `data_proxy_cluster`;
diff --git a/inlong-manager/doc/sql/apache_inlong_manager.sql b/inlong-manager/doc/sql/apache_inlong_manager.sql
index 6df8bdc..dec268e 100644
--- a/inlong-manager/doc/sql/apache_inlong_manager.sql
+++ b/inlong-manager/doc/sql/apache_inlong_manager.sql
@@ -259,6 +259,26 @@ CREATE TABLE `consumption`
DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
-- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+ `id` int NOT NULL AUTO_INCREMENT,
+ `consumption_id` int DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+ `consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
+ `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+ `inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group ID',
+ `is_rlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+ `retry_letter_topic` varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+ `is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+ `dead_letter_topic` varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+ `is_deleted` int DEFAULT '0' COMMENT 'Whether to delete',
+ PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+ DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
-- Table structure for data_proxy_cluster
-- ----------------------------
DROP TABLE IF EXISTS `data_proxy_cluster`;
diff --git a/inlong-manager/manager-common/pom.xml b/inlong-manager/manager-common/pom.xml
index dd54e5c..3692efe 100644
--- a/inlong-manager/manager-common/pom.xml
+++ b/inlong-manager/manager-common/pom.xml
@@ -32,6 +32,16 @@
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>
<dependency>
@@ -70,7 +80,6 @@
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
</dependency>
-
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>knife4j-spring-boot-starter</artifactId>
@@ -118,25 +127,18 @@
</exclusions>
</dependency>
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-web</artifactId>
- </dependency>
- <dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper-spring-boot-starter</artifactId>
</dependency>
-
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<scope>provided</scope>
</dependency>
-
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
</dependency>
-
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
@@ -154,6 +156,15 @@
</dependency>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-admin</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-common</artifactId>
<version>${project.version}</version>
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java
index 78185bd..bf47633 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java
@@ -49,4 +49,7 @@ public class ClusterBean {
@Value("${pulsar.serviceUrl}")
private String pulsarServiceUrl;
+ @Value("${pulsar.defaultTenant}")
+ private String defaultTenant;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionHandle.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionHandle.java
new file mode 100644
index 0000000..c34e5f1
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.conversion;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ConversionHandle {
+
+ private static Map<String, ConversionStrategy> unitMap;
+
+ private void loadStrategy() {
+ unitMap = new HashMap<>(16);
+ unitMap.put("hours_minutes", new HoursToMinute());
+ unitMap.put("hours_seconds", new HoursToSeconds());
+ unitMap.put("gb_mb", new GBToMB());
+ unitMap.put("tb_mb", new TBToMB());
+ unitMap.put("days_seconds", new DaysToSeconds());
+ unitMap.put("days_minutes", new DaysToMinute());
+ unitMap.put("seconds_seconds", new SecondsToSeconds());
+ unitMap.put("mb_mb", new MBToMB());
+ }
+
+ public Integer handleConversion(Integer value, String type) {
+ if (unitMap == null) {
+ this.loadStrategy();
+ }
+ ConversionStrategy conversionStrategy = unitMap.get(type);
+ ConversionStatusContext conversionStatusContext = new ConversionStatusContext(conversionStrategy);
+ return conversionStatusContext.executeConversion(value);
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStatusContext.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStatusContext.java
index 613973b..f502c5e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStatusContext.java
@@ -15,22 +15,18 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+public class ConversionStatusContext {
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
+ private final ConversionStrategy conversionStrategy;
- private Integer id;
+ public ConversionStatusContext(ConversionStrategy conversionStrategy) {
+ this.conversionStrategy = conversionStrategy;
+ }
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+ public Integer executeConversion(Integer value) {
+ return conversionStrategy.unitConversion(value);
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStrategy.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStrategy.java
index 613973b..fdd263d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStrategy.java
@@ -15,22 +15,9 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+public interface ConversionStrategy {
+ Integer unitConversion(Integer value);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToMinute.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToMinute.java
index 613973b..658bb1c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToMinute.java
@@ -15,22 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+public class DaysToMinute implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value * 24 * 60;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToSeconds.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToSeconds.java
index 613973b..a234474 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToSeconds.java
@@ -15,22 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+public class DaysToSeconds implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value * 24 * 60 * 60;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/GBToMB.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/GBToMB.java
index 613973b..c513e54 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/GBToMB.java
@@ -15,22 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import org.springframework.stereotype.Component;
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+@Component
+public class GBToMB implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value << 10;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToMinute.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToMinute.java
index 613973b..4d76e4e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToMinute.java
@@ -15,22 +15,16 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import org.springframework.stereotype.Component;
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+@Component
+public class HoursToMinute implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value * 60;
+ }
}
+
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToSeconds.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToSeconds.java
index 613973b..781d069 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToSeconds.java
@@ -15,22 +15,15 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import org.springframework.stereotype.Component;
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+@Component
+public class HoursToSeconds implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value * 60 * 60;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/MBToMB.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/MBToMB.java
index 613973b..4a5c3f9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/MBToMB.java
@@ -15,22 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+public class MBToMB implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/SecondsToSeconds.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/SecondsToSeconds.java
index 613973b..75c95da 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/SecondsToSeconds.java
@@ -15,22 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+public class SecondsToSeconds implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/TBToMB.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/TBToMB.java
index 613973b..c7f527e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/TBToMB.java
@@ -15,22 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
- private Integer id;
-
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
+public class TBToMB implements ConversionStrategy {
+ @Override
+ public Integer unitConversion(Integer value) {
+ return value << 20;
+ }
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index a5589b0..c2c322d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -40,8 +40,20 @@ public class BizConstant {
public static final String SCHEMA_M0_DAY = "m0_day";
+ public static final String CLUSTER_HIVE_TOPO = "HIVE_TOPO";
+
public static final String GROUP_ID_IS_EMPTY = "business group id is empty";
public static final String STREAM_ID_IS_EMPTY = "data stream id is empty";
+ public static final String PULSAR_TOPIC_TYPE_SERIAL = "SERIAL";
+
+ public static final String PULSAR_TOPIC_TYPE_PARALLEL = "PARALLEL";
+
+ public static final String SYSTEM_USER = "SYSTEM"; // system user
+
+ public static final String PREFIX_DLQ = "dlq"; // prefix of the Topic of the dead letter queue
+
+ public static final String PREFIX_RLQ = "rlq"; // prefix of the Topic of the retry letter queue
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
index 32624a4..b7a2290 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
@@ -80,7 +80,14 @@ public enum BizErrorCodeEnum {
WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
CONSUMER_GROUP_NAME_DUPLICATED(2600, "The consumer group already exists in the cluster"),
- CONSUMER_GROUP_CREATE_FAILED(5001, "Failed to create tube consumer group"),
+ CONSUMER_GROUP_CREATE_FAILED(2601, "Failed to create tube consumer group"),
+ TUBE_GROUP_CREATE_FAILED(2602, "Create Tube consumer group failed"),
+ PULSAR_GROUP_CREATE_FAILED(2603, "Create Pulsar consumer group failed"),
+ TUBE_TOPIC_CREATE_FAILED(2604, "CreateTube Topic failed"),
+ PULSAR_TOPIC_CREATE_FAILED(2605, "Create Pulsar Topic failed"),
+ PULSAR_DLQ_RLQ_ERROR(2606, "Wrong config for the RLQ and DLQ: RLQ was enabled, but the DLQ was disabled"),
+ PULSAR_DLQ_DUPLICATED(2607, "DLQ topic already exists under the business"),
+ PULSAR_RLQ_DUPLICATED(2608, "RLQ topic already exists under the business"),
COMMON_FILE_DOWNLOAD_FAIL(6001, "File download failed"),
COMMON_FILE_UPLOAD_FAIL(6002, "File upload failed"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
index db87278..eb2906b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
@@ -67,6 +67,13 @@ public class BusinessInfo {
@ApiModelProperty(value = "Pulsar service URL")
private String pulsarServiceUrl;
+ @ApiModelProperty(value = "Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order "
+ + "messages; serial: single partition, low throughput, and orderly messages")
+ private String queueModule = "parallel";
+
+ @ApiModelProperty(value = "The number of partitions of Pulsar Topic, 1-20")
+ private Integer topicPartitionNum = 3;
+
@ApiModelProperty(value = "Data type name")
private String schemaName;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java
index 94553a9..6ed1d26 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java
@@ -36,8 +36,8 @@ public class BusinessTopicVO {
@ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
private String middlewareType;
- @ApiModelProperty(value = "Tube topic name")
- private String topicName;
+ @ApiModelProperty(value = "Tube topic name, or Pulsar namespace name")
+ private String mqResourceObj;
@ApiModelProperty(value = "Topic list, Tube corresponds to business group, there is only 1 topic, "
+ "Pulsar corresponds to data stream, there are multiple topics")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
similarity index 95%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
index 2834050..cebdcfb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
import com.fasterxml.jackson.annotation.JsonIgnore;
import io.swagger.annotations.ApiModel;
@@ -87,6 +87,9 @@ public class ConsumptionInfo {
private Date modifyTime;
+ @ApiModelProperty(value = "Extended information for MQ")
+ private ConsumptionMqExtBase mqExtInfo;
+
@JsonIgnore
@AssertTrue(message = "when filter enabled, data stream id cannot be null")
public boolean isValidateFilter() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionListVo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionListVo.java
similarity index 97%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionListVo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionListVo.java
index 9ac259c..02854a7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionListVo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionListVo.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
new file mode 100644
index 0000000..287bcd6
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.consumption;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.BizConstant;
+
+/**
+ * Extended consumption information of different MQs
+ */
+@Data
+@ApiModel("Extended consumption information of different MQs")
+@JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class)
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = BizConstant.MIDDLEWARE_PULSAR)
+})
+public class ConsumptionMqExtBase {
+
+ @ApiModelProperty(value = "Self-incrementing primary key")
+ private Integer id;
+
+ @ApiModelProperty(value = "Consumer information ID")
+ private Integer consumptionId;
+
+ @ApiModelProperty(value = "Consumer group")
+ private String consumerGroup;
+
+ @ApiModelProperty(value = "Consumption target inlong group id")
+ private String inlongGroupId;
+
+ @ApiModelProperty("Whether to delete, 0: not deleted, 1: deleted")
+ private Integer isDeleted = 0;
+
+ @ApiModelProperty(value = "Data storage middleware type, high throughput: TUBE, high consistency: PULSAR")
+ private String middlewareType;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
new file mode 100644
index 0000000..350fef4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.consumption;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+
+/**
+ * Pulsar consumer information
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+@ApiModel("Pulsar consumer information")
+public class ConsumptionPulsarInfo extends ConsumptionMqExtBase {
+
+ @ApiModelProperty("The middleware type of MQ")
+ private String middlewareType = BizConstant.MIDDLEWARE_PULSAR;
+
+ @ApiModelProperty("Whether to configure the dead letter queue, 0: do not configure, 1: configure")
+ private Integer isDlq;
+
+ @ApiModelProperty("The name of the dead letter queue Topic")
+ private String deadLetterTopic;
+
+ @ApiModelProperty("Whether to configure the retry letter queue, 0: do not configure, 1: configure")
+ private Integer isRlq;
+
+ @ApiModelProperty("The name of the retry letter queue topic")
+ private String retryLetterTopic;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionQuery.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionQuery.java
similarity index 97%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionQuery.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionQuery.java
index b786125..e3670c3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionQuery.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionQuery.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSummary.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionSummary.java
similarity index 96%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSummary.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionSummary.java
index 9be2820..41f4038 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSummary.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionSummary.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfo.java
deleted file mode 100644
index 9aaf3b6..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.dataconsumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * Data consumption progress
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption progress")
-public class ConsumerProgressInfo {
-
- @ApiModelProperty("Report time")
- private Date repTime;
-
- @ApiModelProperty("Report date")
- private Long repDate;
-
- @ApiModelProperty("Consumer group")
- private String consumerGroup;
-
- @ApiModelProperty("Consumption TOPIC")
- private String topic;
-
- @ApiModelProperty("Broker IP")
- private String brokerIp;
-
- @ApiModelProperty("Broker Id")
- private Long brokerId;
-
- @ApiModelProperty("Partition ID")
- private Long part;
-
- @ApiModelProperty("Consumption ID")
- private String consumerId;
-
- @ApiModelProperty("Consumer IP")
- private String consumerIp;
-
- @ApiModelProperty("Consumer process ID")
- private String consumerPid;
-
- @ApiModelProperty("Is the heartbeat normal")
- private String isHeartbeatOk;
-
- @ApiModelProperty("heartbeat time")
- private String hartbeat;
-
- @ApiModelProperty("Is the storage normal")
- private String isStoreOk;
-
- @ApiModelProperty("Consumption difference")
- private Double fileConSize;
-
- @ApiModelProperty("Production speed")
- private Double fileSpeed;
-
- @ApiModelProperty("Consumption speed")
- private Double consumeSpeed;
-
- @ApiModelProperty("Consumption difference speed")
- private Double fileConSpeed;
-
- @ApiModelProperty("Status: 0 normal; 1 abnormal; 2 shielded; 3 not yet available")
- private String state;
-
- @ApiModelProperty("Middleware type")
- private String middleware;
-
- private Double tmpOffset;
- private Double minOffset;
- private Double fileOffset;
- private Double consumerOffset;
- private Double saveFileSize;
- private Integer intervalSec;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfoQuery.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfoQuery.java
deleted file mode 100644
index 5707233..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfoQuery.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.dataconsumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.common.beans.PageRequest;
-import org.springframework.format.annotation.DateTimeFormat;
-
-/**
- * Consumption progress query
- */
-@EqualsAndHashCode(callSuper = false)
-@Data
-@ApiModel("Consumption progress query")
-public class ConsumerProgressInfoQuery extends PageRequest {
-
- @ApiModelProperty(hidden = true, value = "Consumption ID")
- private Integer consumerId;
-
- @ApiModelProperty(value = "Consumer Group ID", required = true)
- @NotBlank(message = "consumerGroupId can't be null")
- private String consumerGroupId;
-
- @ApiModelProperty(value = "Reporting time 5 minutes interval: yyyy-MM-dd HH:mm", required = true)
- @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")
- @NotNull(message = "repTime can't be null")
- private Date repTime;
-
- @ApiModelProperty(value = "Report date: yyyyMMdd")
- private Long repDate;
-
- @ApiModelProperty(value = "Consumption TOPIC")
- private String topic;
-
- @ApiModelProperty(value = "Broker Ip")
- private String brokerIp;
-
- @ApiModelProperty(value = "Consumer Group IP")
- private String consumerIp;
-
- @ApiModelProperty(value = "Is the heartbeat normal")
- private String isHeartbeatOk;
-
- @ApiModelProperty(value = "Is the storage normal")
- private String isStoreOk;
-
- @ApiModelProperty(value = "state")
- private String state;
-
- @ApiModelProperty(value = "Middleware Type")
- private String middleware;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSaveResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/pulsar/PulsarTopicBean.java
similarity index 69%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSaveResponse.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/pulsar/PulsarTopicBean.java
index 90e7811..15e8407 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSaveResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/pulsar/PulsarTopicBean.java
@@ -15,26 +15,27 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.pulsar;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-/**
- * Response after data consumption information is successfully saved
- */
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("Response after data consumption information is successfully saved")
-public class ConsumptionSaveResponse {
+public class PulsarTopicBean {
+
+ private String tenant;
+
+ private String namespace;
+
+ private String topicName;
+
+ private String queueModule;
- @ApiModelProperty(value = "Data consumption ID")
- private Integer id;
+ private Integer numPartitions = 0;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
similarity index 65%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
rename to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
index 613973b..675126b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
@@ -15,22 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.dao.entity;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
+import java.io.Serializable;
import lombok.Data;
-/**
- * Data consumption update info
- */
@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
+public class ConsumptionPulsarEntity implements Serializable {
+ private static final long serialVersionUID = 1L;
private Integer id;
+ private Integer consumptionId;
+ private String consumerGroupId;
+ private String inlongGroupId;
+ private Integer isDlq;
+ private String deadLetterTopic;
+ private Integer isRlq;
+ private String retryLetterTopic;
+ private Integer isDeleted;
- @ApiModelProperty(value = "consumption in charges")
- private String inCharges;
-
-}
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
index 5b9d90b..15cd115 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
@@ -38,6 +38,12 @@ public class DataStreamEntity implements Serializable {
private String fileDelimiter;
private Integer havePredefinedFields;
private String inCharges;
+
+ private Integer dailyRecords;
+ private Integer dailyStorage;
+ private Integer peakRecords;
+ private Integer maxLength;
+
private Integer status;
private Integer previousStatus;
private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
index 634548b..95990fa 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.dao.mapper;
import java.util.List;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.workflow.model.view.CountByKey;
import org.springframework.stereotype.Repository;
@@ -34,6 +34,8 @@ public interface ConsumptionEntityMapper {
ConsumptionEntity selectByPrimaryKey(Integer id);
+ ConsumptionEntity selectConsumptionExists(String groupId, String topic, String consumerGroup);
+
int updateByPrimaryKeySelective(ConsumptionEntity record);
int updateByPrimaryKey(ConsumptionEntity record);
@@ -41,4 +43,5 @@ public interface ConsumptionEntityMapper {
List<ConsumptionEntity> listByQuery(ConsumptionQuery consumptionQuery);
List<CountByKey> countByStatus(ConsumptionQuery consumptionQuery);
+
}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
similarity index 55%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
index 634548b..99f65d2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
@@ -17,28 +17,27 @@
package org.apache.inlong.manager.dao.mapper;
-import java.util.List;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.workflow.model.view.CountByKey;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
import org.springframework.stereotype.Repository;
@Repository
-public interface ConsumptionEntityMapper {
+public interface ConsumptionPulsarEntityMapper {
- int deleteByPrimaryKey(Integer id);
+ int deleteByConsumptionId(@Param("consumptionId") Integer consumptionId);
- int insert(ConsumptionEntity record);
+ int insert(ConsumptionPulsarEntity record);
- int insertSelective(ConsumptionEntity record);
+ int insertSelective(ConsumptionPulsarEntity record);
- ConsumptionEntity selectByPrimaryKey(Integer id);
+ ConsumptionPulsarEntity selectByPrimaryKey(@Param("id") Integer id);
- int updateByPrimaryKeySelective(ConsumptionEntity record);
+ ConsumptionPulsarEntity selectByConsumptionId(@Param("consumptionId") Integer consumptionId);
- int updateByPrimaryKey(ConsumptionEntity record);
+ int updateByPrimaryKeySelective(ConsumptionPulsarEntity record);
- List<ConsumptionEntity> listByQuery(ConsumptionQuery consumptionQuery);
+ int updateByConsumptionId(ConsumptionPulsarEntity record);
- List<CountByKey> countByStatus(ConsumptionQuery consumptionQuery);
-}
+ int updateByPrimaryKey(ConsumptionPulsarEntity record);
+
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
index 257b085..c1c988d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
@@ -85,4 +85,9 @@ public interface DataStreamEntityMapper {
int updateStatusByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId,
@Param("status") Integer status, @Param("modifier") String modifier);
+ /**
+ * Logic delete dlq or rlq topic by bid
+ */
+ void logicDeleteDlqOrRlq(String groupId, String streamId, String operator);
+
}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
index 51f150c..7c77590 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
@@ -50,6 +50,16 @@
from consumption
where id = #{id,jdbcType=INTEGER} and is_deleted = 0
</select>
+ <select id="selectConsumptionExists" resultType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from consumption
+ where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and topic = #{topic, jdbcType=VARCHAR}
+ and consumer_group_id = #{consumerGroup, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </select>
+
<delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
update consumption
set is_deleted=id
@@ -237,7 +247,7 @@
</update>
<select id="listByQuery"
- parameterType="org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery"
+ parameterType="org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery"
resultMap="BaseResultMap">
select
c.*
@@ -309,7 +319,7 @@
</select>
<select id="countByStatus"
- parameterType="org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery"
+ parameterType="org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery"
resultType="org.apache.inlong.manager.workflow.model.view.CountByKey">
select
status as `key`,count(1) as value
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
new file mode 100644
index 0000000..ed44894
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper">
+ <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+ <id column="id" jdbcType="INTEGER" property="id"/>
+ <result column="consumption_id" jdbcType="VARCHAR" property="consumptionId"/>
+ <result column="consumer_group_id" jdbcType="VARCHAR" property="consumerGroupId"/>
+ <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+ <result column="is_rlq" jdbcType="INTEGER" property="isRlq"/>
+ <result column="retry_letter_topic" jdbcType="VARCHAR" property="retryLetterTopic"/>
+ <result column="is_dlq" jdbcType="INTEGER" property="isDlq"/>
+ <result column="dead_letter_topic" jdbcType="VARCHAR" property="deadLetterTopic"/>
+ <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+ </resultMap>
+ <sql id="Base_Column_List">
+ id, consumption_id, consumer_group_id, inlong_group_id, is_rlq, retry_letter_topic,
+ is_dlq, dead_letter_topic, is_deleted
+ </sql>
+ <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from consumption_pulsar
+ where id = #{id,jdbcType=INTEGER}
+ </select>
+ <select id="selectByConsumptionId" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+ select
+ <include refid="Base_Column_List"/>
+ from consumption_pulsar
+ where consumption_id = #{consumptionId, jdbcType=INTEGER}
+ and is_deleted = 0
+ </select>
+
+ <delete id="deleteByConsumptionId">
+ update consumption_pulsar
+ set is_deleted = 1
+ where consumption_id = #{consumptionId, jdbcType=INTEGER}
+ and is_deleted = 0
+ </delete>
+
+ <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+ insert into consumption_pulsar (id, consumption_id, consumer_group_id,
+ inlong_group_id, is_rlq, retry_letter_topic,
+ is_dlq, dead_letter_topic, is_deleted)
+ values (#{id,jdbcType=INTEGER}, #{consumptionId,jdbcType=INTEGER}, #{consumerGroupId,jdbcType=VARCHAR},
+ #{inlongGroupId,jdbcType=VARCHAR}, #{isRlq,jdbcType=INTEGER}, #{retryLetterTopic,jdbcType=VARCHAR},
+ #{isDlq,jdbcType=INTEGER}, #{deadLetterTopic,jdbcType=VARCHAR}, #{isDeleted,jdbcType=INTEGER})
+ </insert>
+ <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+ insert into consumption_pulsar
+ <trim prefix="(" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ id,
+ </if>
+ <if test="consumptionId != null">
+ consumption_id,
+ </if>
+ <if test="consumerGroupId != null">
+ consumer_group_id,
+ </if>
+ <if test="inlongGroupId != null">
+ inlong_group_id,
+ </if>
+ <if test="isRlq != null">
+ is_rlq,
+ </if>
+ <if test="retryLetterTopic != null">
+ retry_letter_topic,
+ </if>
+ <if test="isDlq != null">
+ is_dlq,
+ </if>
+ <if test="deadLetterTopic != null">
+ dead_letter_topic,
+ </if>
+ <if test="isDeleted != null">
+ is_deleted,
+ </if>
+ </trim>
+ <trim prefix="values (" suffix=")" suffixOverrides=",">
+ <if test="id != null">
+ #{id,jdbcType=INTEGER},
+ </if>
+ <if test="consumerGroupId != null">
+ #{consumerGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongGroupId != null">
+ #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="isRlq != null">
+ #{isRlq,jdbcType=INTEGER},
+ </if>
+ <if test="retryLetterTopic != null">
+ #{retryLetterTopic,jdbcType=VARCHAR},
+ </if>
+ <if test="isDlq != null">
+ #{isDlq,jdbcType=INTEGER},
+ </if>
+ <if test="deadLetterTopic != null">
+ #{deadLetterTopic,jdbcType=VARCHAR},
+ </if>
+ <if test="isDeleted != null">
+ #{isDeleted,jdbcType=INTEGER},
+ </if>
+ </trim>
+ </insert>
+ <update id="updateByPrimaryKeySelective"
+ parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+ update consumption_pulsar
+ <set>
+ <if test="consumptionId != null">
+ consumption_id = #{consumptionId,jdbcType=INTEGER},
+ </if>
+ <if test="consumerGroupId != null">
+ consumer_group_id = #{consumerGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="inlongGroupId != null">
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ </if>
+ <if test="isRlq != null">
+ is_rlq = #{isRlq,jdbcType=INTEGER},
+ </if>
+ <if test="retryLetterTopic != null">
+ retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
+ </if>
+ <if test="isDlq != null">
+ is_dlq = #{isDlq,jdbcType=INTEGER},
+ </if>
+ <if test="deadLetterTopic != null">
+ dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
+ </if>
+ <if test="isDeleted != null">
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </if>
+ </set>
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+ <update id="updateByConsumptionId">
+ update consumption_pulsar
+ <set>
+ consumer_group_id = #{consumerGroupId,jdbcType=VARCHAR},
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ is_rlq = #{isRlq,jdbcType=INTEGER},
+ retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
+ is_dlq = #{isDlq,jdbcType=INTEGER},
+ dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
+ is_deleted = #{isDeleted,jdbcType=INTEGER},
+ </set>
+ where consumption_id = #{consumptionId,jdbcType=INTEGER}
+ </update>
+ <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+ update consumption_pulsar
+ set consumption_id = #{consumptionId,jdbcType=INTEGER},
+ consumer_group_id = #{consumerGroupId,jdbcType=VARCHAR},
+ inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+ is_rlq = #{isRlq,jdbcType=INTEGER},
+ retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
+ is_dlq = #{isDlq,jdbcType=INTEGER},
+ dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
+ is_deleted = #{isDeleted,jdbcType=INTEGER}
+ where id = #{id,jdbcType=INTEGER}
+ </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
index 759a0ac..16b9d7e 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
@@ -509,6 +509,14 @@
and is_deleted = 0
</where>
</update>
+ <update id="logicDeleteDlqOrRlq">
+ update data_stream
+ set is_deleted = 1,
+ modifier = #{operator, jdbcType=VARCHAR}
+ where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+ and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+ and is_deleted = 0
+ </update>
<select id="selectStreamToHiveInfo" resultMap="dataStreamFullInfo">
SELECT h.id,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
index a25042d..64b1af1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
@@ -18,11 +18,11 @@
package org.apache.inlong.manager.service.core;
import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionListVo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionSummary;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionUpdateInfo;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionListVo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionSummary;
import org.apache.inlong.manager.service.workflow.WorkflowResult;
/**
@@ -52,15 +52,7 @@ public interface ConsumptionService {
* @param id Consumer ID
* @return Details
*/
- ConsumptionInfo getInfo(Integer id);
-
- /**
- * According to data consumption details
- *
- * @param consumerGroupId Consumer group ID
- * @return Consumption info
- */
- ConsumptionInfo getInfo(String consumerGroupId);
+ ConsumptionInfo get(Integer id);
/**
* Determine whether the Consumer group ID already exists
@@ -69,7 +61,7 @@ public interface ConsumptionService {
* @param excludeSelfId Exclude the ID of this record
* @return does it exist
*/
- boolean isConsumerGroupIdExist(String consumerGroupId, Integer excludeSelfId);
+ boolean isConsumerGroupIdExists(String consumerGroupId, Integer excludeSelfId);
/**
* Save basic data consumption information
@@ -83,11 +75,10 @@ public interface ConsumptionService {
/**
* Update the person in charge of data consumption, etc
*
- * @param consumptionUpdateInfo Update information
+ * @param consumptionInfo consumption information
* @param operator Operator
- * @return Updated id
*/
- Integer update(ConsumptionUpdateInfo consumptionUpdateInfo, String operator);
+ Boolean update(ConsumptionInfo consumptionInfo, String operator);
/**
* Delete data consumption
@@ -95,7 +86,7 @@ public interface ConsumptionService {
* @param id Consumer ID
* @param operator Operator
*/
- void delete(Integer id, String operator);
+ Boolean delete(Integer id, String operator);
/**
* Start the application process
@@ -106,4 +97,9 @@ public interface ConsumptionService {
*/
WorkflowResult startProcess(Integer id, String operator);
+ /**
+ * Save the consumer group info for Sort to the database
+ */
+ void saveSortConsumption(BusinessInfo bizInfo, String topic, String consumerGroup);
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index f23a722..75b463b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -55,6 +55,15 @@ public interface DataStreamService {
DataStreamInfo get(String groupId, String streamId);
/**
+ * Query whether the data stream ID exists
+ *
+ * @param groupId business group id
+ * @param streamId data stream id
+ * @return true: exists, false: does not exist
+ */
+ Boolean exist(String groupId, String streamId);
+
+ /**
* Query data stream list based on conditions
*
* @param request Data stream paging query request
@@ -188,4 +197,16 @@ public interface DataStreamService {
*/
boolean updateStatus(String groupId, String streamId, Integer status, String operator);
+ /**
+ * According to the specified DLQ / RLQ name, create the corresponding Pulsar's Topic stream
+ *
+ * @param topicName Pulsar's Topic name, which is the data stream ID
+ */
+ void insertDlqOrRlq(String bid, String topicName, String operator);
+
+ /**
+ * Logic delete dlq or rlq topic by bid
+ */
+ void logicDeleteDlqOrRlq(String bid, String topicName, String operator);
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 1776843..152125a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -383,7 +383,7 @@ public class BusinessServiceImpl implements BusinessService {
if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
// Tube Topic corresponds to business one-to-one
- topicVO.setTopicName(businessInfo.getMqResourceObj());
+ topicVO.setMqResourceObj(businessInfo.getMqResourceObj());
topicVO.setTubeMasterUrl(clusterBean.getTubeMaster());
} else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
// Pulsar's topic corresponds to the data stream one-to-one
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 388ea01..a37a771 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -20,27 +20,42 @@ package org.apache.inlong.manager.service.core.impl;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
+import java.util.Arrays;
import java.util.Date;
+import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
import org.apache.inlong.manager.common.pojo.business.BusinessTopicVO;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionListVo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionSummary;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionUpdateInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionListVo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionMqExtBase;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionSummary;
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.core.DataStreamService;
import org.apache.inlong.manager.service.workflow.ProcessName;
import org.apache.inlong.manager.service.workflow.WorkflowResult;
import org.apache.inlong.manager.service.workflow.WorkflowService;
@@ -59,13 +74,19 @@ import org.springframework.util.CollectionUtils;
public class ConsumptionServiceImpl implements ConsumptionService {
@Autowired
+ private ClusterBean clusterBean;
+ @Autowired
+ private BusinessEntityMapper businessMapper;
+ @Autowired
private ConsumptionEntityMapper consumptionMapper;
@Autowired
+ private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
+ @Autowired
private WorkflowService workflowService;
@Autowired
private BusinessService businessService;
@Autowired
- private ClusterBean clusterBean;
+ private DataStreamService streamService;
@Override
public ConsumptionSummary getSummary(ConsumptionQuery query) {
@@ -83,7 +104,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
@Override
public PageInfo<ConsumptionListVo> list(ConsumptionQuery query) {
PageHelper.startPage(query.getPageNum(), query.getPageSize());
- Page<ConsumptionEntity> pageResult = (Page<ConsumptionEntity>) this.consumptionMapper.listByQuery(query);
+ Page<ConsumptionEntity> pageResult = (Page<ConsumptionEntity>) consumptionMapper.listByQuery(query);
PageInfo<ConsumptionListVo> pageInfo = pageResult
.toPageInfo(entity -> CommonBeanUtils.copyProperties(entity, ConsumptionListVo::new));
pageInfo.setTotal(pageResult.getTotal());
@@ -91,92 +112,208 @@ public class ConsumptionServiceImpl implements ConsumptionService {
}
@Override
- public ConsumptionInfo getInfo(Integer id) {
- Preconditions.checkNotNull(id, "Consumption id can't be null");
+ public ConsumptionInfo get(Integer id) {
+ Preconditions.checkNotNull(id, "consumption id cannot be null");
ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(id);
- Preconditions.checkNotNull(entity, () -> "Consumption not exist with id:" + id);
+ Preconditions.checkNotNull(entity, "consumption not exist with id:" + id);
- ConsumptionInfo consumptionInfo = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
- consumptionInfo.setMasterUrl(clusterBean.getTubeMaster());
+ ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
- return consumptionInfo;
- }
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+ ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
+ Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
+ ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
+ info.setMqExtInfo(pulsarInfo);
- @Override
- public ConsumptionInfo getInfo(String consumerGroupId) {
- Preconditions.checkNotEmpty(consumerGroupId, "ConsumerGroupId can't be null");
- ConsumptionQuery consumptionQuery = new ConsumptionQuery();
- consumptionQuery.setConsumerGroupId(consumerGroupId);
- List<ConsumptionEntity> result = consumptionMapper.listByQuery(consumptionQuery);
- if (CollectionUtils.isEmpty(result)) {
- return null;
+ info.setTopic(this.getFullPulsarTopic(info.getInlongGroupId(), info.getTopic()));
}
- ConsumptionEntity entity = result.get(0);
- ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
- info.setMasterUrl(clusterBean.getTubeMaster());
-
return info;
}
@Override
- public boolean isConsumerGroupIdExist(String groupId, Integer excludeSelfId) {
+ public boolean isConsumerGroupIdExists(String consumerGroup, Integer excludeSelfId) {
ConsumptionQuery consumptionQuery = new ConsumptionQuery();
- consumptionQuery.setConsumerGroupId(groupId);
+ consumptionQuery.setConsumerGroupId(consumerGroup);
List<ConsumptionEntity> result = consumptionMapper.listByQuery(consumptionQuery);
if (excludeSelfId != null) {
- result = result.stream()
- .filter(entity -> !excludeSelfId.equals(entity.getId()))
+ result = result.stream().filter(consumer -> !excludeSelfId.equals(consumer.getId()))
.collect(Collectors.toList());
}
return !CollectionUtils.isEmpty(result);
}
@Override
- @Transactional(rollbackFor = Exception.class)
- public Integer save(ConsumptionInfo consumptionInfo, String operator) {
- checkConsumptionInfo(consumptionInfo);
- ConsumptionEntity consumptionEntity = saveConsumption(consumptionInfo, operator);
+ @Transactional(rollbackFor = Throwable.class)
+ public Integer save(ConsumptionInfo info, String operator) {
+ fullConsumptionInfo(info);
+
+ Date now = new Date();
+ ConsumptionEntity entity = this.saveConsumption(info, operator, now);
+
+ if (BizConstant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
+ savePulsarInfo(info.getMqExtInfo(), entity);
+ }
+
+ return entity.getId();
+ }
+
+ /**
+ * Save Pulsar consumption info
+ */
+ private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity entity) {
+ Preconditions.checkNotNull(mqExtBase, "Pulsar info cannot be empty, as the middleware is Pulsar");
+ ConsumptionPulsarInfo pulsarInfo = (ConsumptionPulsarInfo) mqExtBase;
+
+ // Prerequisite for RLQ to be turned on: DLQ must be turned on
+ boolean dlqEnable = (pulsarInfo.getIsDlq() != null && pulsarInfo.getIsDlq() == 1);
+ boolean rlqEnable = (pulsarInfo.getIsRlq() != null && pulsarInfo.getIsRlq() == 1);
+ if (rlqEnable && !dlqEnable) {
+ throw new BusinessException(BizErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
+ }
+
+ // When saving, the DLQ / RLQ under the same groupId cannot be repeated;
+ // when closing, delete the related configuration
+ String groupId = entity.getInlongGroupId();
+ if (dlqEnable) {
+ String dlqTopic = BizConstant.PREFIX_DLQ + "_" + pulsarInfo.getDeadLetterTopic();
+ Boolean exist = streamService.exist(groupId, dlqTopic);
+ if (exist) {
+ throw new BusinessException(BizErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
+ }
+ } else {
+ pulsarInfo.setIsDlq(0);
+ pulsarInfo.setDeadLetterTopic(null);
+ }
+ if (rlqEnable) {
+ String rlqTopic = BizConstant.PREFIX_RLQ + "_" + pulsarInfo.getRetryLetterTopic();
+ Boolean exist = streamService.exist(groupId, rlqTopic);
+ if (exist) {
+ throw new BusinessException(BizErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
+ }
+ } else {
+ pulsarInfo.setIsRlq(0);
+ pulsarInfo.setRetryLetterTopic(null);
+ }
- log.debug("success to save consumption {}", consumptionInfo);
- return consumptionEntity.getId();
+ ConsumptionPulsarEntity pulsar = CommonBeanUtils.copyProperties(pulsarInfo, ConsumptionPulsarEntity::new);
+ Integer consumptionId = entity.getId();
+ pulsar.setConsumptionId(consumptionId);
+ pulsar.setInlongGroupId(groupId);
+ pulsar.setConsumerGroupId(entity.getConsumerGroupId());
+ pulsar.setIsDeleted(0);
+
+ // Pulsar consumer information may already exist, update if it exists, add if it does not exist
+ ConsumptionPulsarEntity exists = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
+ if (exists == null) {
+ consumptionPulsarMapper.insert(pulsar);
+ } else {
+ pulsar.setId(exists.getId());
+ consumptionPulsarMapper.updateByPrimaryKey(pulsar);
+ }
}
@Override
- @Transactional(rollbackFor = Exception.class)
- public Integer update(ConsumptionUpdateInfo info, String operator) {
- Preconditions.checkNotNull(info, "update info can't be null");
- Preconditions.checkNotNull(info.getId(), "consumption id can't be null");
- ConsumptionInfo consumptionInfo = this.getInfo(info.getId());
- Preconditions.checkNotNull(consumptionInfo, "consumption not exist with id " + info.getId());
- Preconditions.checkTrue(consumptionInfo.getInCharges().contains(operator),
- "operator is not the owner for this consumption");
+ @Transactional(rollbackFor = Throwable.class)
+ public Boolean update(ConsumptionInfo info, String operator) {
+ Preconditions.checkNotNull(info, "consumption info cannot be null");
+ Integer consumptionId = info.getId();
+ Preconditions.checkNotNull(consumptionId, "consumption id cannot be null");
+
+ ConsumptionEntity exists = consumptionMapper.selectByPrimaryKey(consumptionId);
+ Preconditions.checkNotNull(exists, "consumption not exist with id " + consumptionId);
+ Preconditions.checkTrue(exists.getInCharges().contains(operator),
+ "operator" + operator + " has no privilege for the consumption");
ConsumptionEntity entity = new ConsumptionEntity();
- entity.setId(info.getId());
- entity.setInCharges(info.getInCharges());
+ Date now = new Date();
+ CommonBeanUtils.copyProperties(info, entity, true);
entity.setModifier(operator);
- entity.setModifyTime(new Date());
+ entity.setModifyTime(now);
+
+ // Modify Pulsar consumption info
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+ ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
+ Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
+ pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
+
+ // Whether DLQ / RLQ is turned on or off
+ ConsumptionPulsarInfo update = (ConsumptionPulsarInfo) info.getMqExtInfo();
+ boolean dlqEnable = (update.getIsDlq() != null && update.getIsDlq() == 1);
+ boolean rlqEnable = (update.getIsRlq() != null && update.getIsRlq() == 1);
+
+ // DLQ is closed, RLQ cannot exist alone and must be closed
+ if (rlqEnable && !dlqEnable) {
+ throw new BusinessException(BizErrorCodeEnum.PULSAR_TOPIC_CREATE_FAILED);
+ }
+
+ // If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
+ // add/remove data streams in the business
+ if (ConsumptionStatus.APPROVED.getStatus() == exists.getStatus()) {
+ String groupId = info.getInlongGroupId();
+ String dlqNameOld = pulsarEntity.getDeadLetterTopic();
+ String dlqNameNew = update.getDeadLetterTopic();
+ if (!dlqEnable) {
+ pulsarEntity.setIsDlq(0);
+ pulsarEntity.setDeadLetterTopic(null);
+ streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
+ } else if (!Objects.equals(dlqNameNew, dlqNameOld)) {
+ pulsarEntity.setIsDlq(1);
+ String topic = BizConstant.PREFIX_DLQ + "_" + dlqNameNew;
+ topic = topic.toLowerCase(Locale.ROOT);
+ pulsarEntity.setDeadLetterTopic(topic);
+ streamService.insertDlqOrRlq(groupId, topic, operator);
+ }
+
+ String rlqNameOld = pulsarEntity.getRetryLetterTopic();
+ String rlqNameNew = update.getRetryLetterTopic();
+ if (!rlqEnable) {
+ pulsarEntity.setIsRlq(0);
+ pulsarEntity.setRetryLetterTopic(null);
+ streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
+ } else if (!Objects.equals(rlqNameNew, pulsarEntity.getRetryLetterTopic())) {
+ pulsarEntity.setIsRlq(1);
+ String topic = BizConstant.PREFIX_RLQ + "_" + rlqNameNew;
+ topic = topic.toLowerCase(Locale.ROOT);
+ pulsarEntity.setRetryLetterTopic(topic);
+ streamService.insertDlqOrRlq(groupId, topic, operator);
+ }
+ }
+
+ consumptionPulsarMapper.updateByConsumptionId(pulsarEntity);
+ }
- this.consumptionMapper.updateByPrimaryKeySelective(entity);
+ consumptionMapper.updateByPrimaryKeySelective(entity);
+ return true;
+ }
- log.debug("success to update consumption {}", entity);
- return entity.getId();
+ /**
+ * According to groupId and topic, stitch the full path of Pulsar Topic
+ * "persistent://" + tenant + "/" + namespace + "/" + topic;
+ */
+ private String getFullPulsarTopic(String groupId, String topic) {
+ BusinessEntity businessEntity = businessMapper.selectByIdentifier(groupId);
+ String tenant = clusterBean.getDefaultTenant();
+ String namespace = businessEntity.getMqResourceObj();
+ return "persistent://" + tenant + "/" + namespace + "/" + topic;
}
@Override
- public void delete(Integer id, String operator) {
+ @Transactional(rollbackFor = Throwable.class)
+ public Boolean delete(Integer id, String operator) {
ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(id);
- Preconditions.checkNotNull(consumptionEntity, () -> "consumption not exist with id:" + id);
- int success = consumptionMapper.deleteByPrimaryKey(id);
- Preconditions.checkTrue(success > 0, "delete failed");
+ Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + id);
+ consumptionMapper.deleteByPrimaryKey(id);
+
+ consumptionPulsarMapper.deleteByConsumptionId(id);
+ return true;
}
@Override
public WorkflowResult startProcess(Integer id, String operation) {
- ConsumptionInfo consumptionInfo = this.getInfo(id);
- Preconditions.checkTrue(ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS
- .contains(ConsumptionStatus.fromStatus(consumptionInfo.getStatus())),
+ ConsumptionInfo consumptionInfo = this.get(id);
+ Preconditions.checkTrue(ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS.contains(
+ ConsumptionStatus.fromStatus(consumptionInfo.getStatus())),
"current status not allow start workflow");
ConsumptionEntity updateConsumptionEntity = new ConsumptionEntity();
@@ -190,62 +327,118 @@ public class ConsumptionServiceImpl implements ConsumptionService {
genNewConsumptionWorkflowForm(consumptionInfo));
}
+ @Override
+ public void saveSortConsumption(BusinessInfo bizInfo, String topic, String consumerGroup) {
+ String groupId = bizInfo.getInlongGroupId();
+ ConsumptionEntity exists = consumptionMapper.selectConsumptionExists(groupId, topic, consumerGroup);
+ if (exists != null) {
+ log.warn("consumption with groupId={}, topic={}, consumer group={} already exists, skip to create",
+ groupId, topic, consumerGroup);
+ return;
+ }
+
+ log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
+ String middlewareType = bizInfo.getMiddlewareType();
+ ConsumptionEntity entity = new ConsumptionEntity();
+ entity.setInlongGroupId(groupId);
+ entity.setMiddlewareType(middlewareType);
+ entity.setTopic(topic);
+ entity.setConsumerGroupId(consumerGroup);
+ entity.setInCharges(bizInfo.getInCharges());
+ entity.setFilterEnabled(0);
+
+ entity.setStatus(ConsumptionStatus.APPROVED.getStatus());
+ entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+ entity.setCreator(bizInfo.getCreator());
+ entity.setCreateTime(new Date());
+
+ consumptionMapper.insert(entity);
+
+ if (BizConstant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
+ ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
+ pulsarEntity.setConsumptionId(entity.getId());
+ pulsarEntity.setConsumerGroupId(consumerGroup);
+ pulsarEntity.setInlongGroupId(groupId);
+ pulsarEntity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+ consumptionPulsarMapper.insert(pulsarEntity);
+ }
+
+ log.debug("success save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
+ }
+
private NewConsumptionWorkflowForm genNewConsumptionWorkflowForm(ConsumptionInfo consumptionInfo) {
NewConsumptionWorkflowForm form = new NewConsumptionWorkflowForm();
+ Integer id = consumptionInfo.getId();
+ if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(consumptionInfo.getMiddlewareType())) {
+ ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
+ ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
+ ConsumptionPulsarInfo::new);
+ consumptionInfo.setMqExtInfo(pulsarInfo);
+ }
form.setConsumptionInfo(consumptionInfo);
return form;
}
- private ConsumptionEntity saveConsumption(ConsumptionInfo consumptionInfo, String operator) {
- Date now = new Date();
- consumptionInfo.setCreateTime(now);
- consumptionInfo.setModifyTime(now);
- consumptionInfo.setModifier(operator);
- consumptionInfo.setCreator(operator);
-
- ConsumptionEntity entity = CommonBeanUtils.copyProperties(consumptionInfo, ConsumptionEntity::new);
+ private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator, Date now) {
+ ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
entity.setStatus(ConsumptionStatus.WAITING_ASSIGN.getStatus());
- entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+ entity.setIsDeleted(0);
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ entity.setCreateTime(now);
+ entity.setModifyTime(now);
- int success;
- if (consumptionInfo.getId() != null) {
- success = this.consumptionMapper.updateByPrimaryKey(entity);
+ if (info.getId() != null) {
+ consumptionMapper.updateByPrimaryKey(entity);
} else {
- success = this.consumptionMapper.insert(entity);
+ consumptionMapper.insert(entity);
}
- Preconditions.checkTrue(success > 0, "save failed");
- Preconditions.checkNotNull(entity.getId(), "save failed");
+
+ Preconditions.checkNotNull(entity.getId(), "save consumption failed");
return entity;
}
/**
- * Check whether the consumption information is valid
+ * Fill in consumer information
*/
- private void checkConsumptionInfo(ConsumptionInfo info) {
- Preconditions.checkNotNull(info, "Consumption info can't be null");
- Preconditions.checkNotNull(info.getConsumerGroupName(), "Consumer Group Name can't be null");
+ private void fullConsumptionInfo(ConsumptionInfo info) {
+ Preconditions.checkNotNull(info, "consumption info cannot be null");
+ info.setConsumerGroupId(info.getConsumerGroupName());
- // Undeleted consumer group id must be unique
- String groupId = info.getConsumerGroupName().toLowerCase(Locale.ROOT);
- info.setConsumerGroupId(groupId);
- Preconditions.checkTrue(!isConsumerGroupIdExist(groupId, info.getId()),
- "Consumer Group ID " + groupId + " already exist");
+ Preconditions.checkFalse(isConsumerGroupIdExists(info.getConsumerGroupId(), info.getId()),
+ "consumer group " + info.getConsumerGroupId() + " already exist");
if (info.getId() != null) {
- ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(info.getId());
- Preconditions.checkNotNull(entity, "Consumption not exist with id:" + info.getId());
+ ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
+ Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
- ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus(entity.getStatus());
+ ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus(consumptionEntity.getStatus());
Preconditions.checkTrue(ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumptionStatus),
- "Consumption not allowed to update when its status is" + consumptionStatus.name());
+ "consumption not allow update when status is " + consumptionStatus.name());
}
- BusinessTopicVO topicVO = businessService.getTopic(info.getInlongGroupId());
- Preconditions.checkNotNull(topicVO, "Business not exist :" + info.getInlongGroupId());
- Preconditions.checkTrue(topicVO.getTopicName() != null
- && topicVO.getTopicName().equals(info.getTopic()),
- "Topic [" + info.getTopic() + "] not belong to business " + info.getInlongGroupId());
-
+ // Determine whether the consumed topic belongs to this groupId or the data stream under it
+ Preconditions.checkNotNull(info.getTopic(), "consumption topic cannot be empty");
+
+ String groupId = info.getInlongGroupId();
+ BusinessTopicVO topicVO = businessService.getTopic(groupId);
+ Preconditions.checkNotNull(topicVO, "business not exist: " + groupId);
+
+ // Tube’s topic is the business group level, one business group, one Tube topic
+ if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+ String bizTopic = topicVO.getMqResourceObj();
+ Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
+ "topic [" + info.getTopic() + "] not belong to business " + groupId);
+ } else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+ // Pulsar's topic is the data stream level.
+ // There will be multiple data streams under one business, and there will be multiple topics
+ List<DataStreamTopicVO> dsTopicList = topicVO.getDsTopicList();
+ if (dsTopicList != null && dsTopicList.size() > 0) {
+ Set<String> topicSet = new HashSet<>(Arrays.asList(info.getTopic().split(",")));
+ dsTopicList.forEach(ds -> topicSet.remove(ds.getMqResourceObj()));
+ Preconditions.checkEmpty(topicSet, "topic [" + topicSet + "] not belong to business " + groupId);
+ }
+ }
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index 7754a39..32d91af 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -149,6 +149,13 @@ public class DataStreamServiceImpl implements DataStreamService {
return streamInfo;
}
+ @Override
+ public Boolean exist(String groupId, String streamId) {
+ Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
+ DataStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+ return streamEntity != null;
+ }
+
/**
* Query and set the extended information and data source fields of the data stream
*
@@ -626,6 +633,41 @@ public class DataStreamServiceImpl implements DataStreamService {
return true;
}
+ @Override
+ public void insertDlqOrRlq(String groupId, String topicName, String operator) {
+ Integer count = streamMapper.selectExistByIdentifier(groupId, topicName);
+ if (count >= 1) {
+ LOGGER.error("DLQ/RLQ topic already exists with name={}", topicName);
+ throw new BusinessException(BizErrorCodeEnum.DATA_STREAM_ID_DUPLICATE, "DLQ/RLQ topic already exists");
+ }
+
+ DataStreamEntity streamEntity = new DataStreamEntity();
+ streamEntity.setInlongGroupId(groupId);
+ streamEntity.setInlongStreamId(topicName);
+ streamEntity.setMqResourceObj(topicName);
+ streamEntity.setDescription("This is DLQ / RLQ topic created by SYSTEM");
+ streamEntity.setDailyRecords(1000);
+ streamEntity.setDailyStorage(1000);
+ streamEntity.setPeakRecords(1000);
+ streamEntity.setMaxLength(1000);
+
+ streamEntity.setStatus(EntityStatus.DATA_STREAM_CONFIG_SUCCESSFUL.getCode());
+ streamEntity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+ streamEntity.setCreator(operator);
+ streamEntity.setModifier(operator);
+ Date now = new Date();
+ streamEntity.setCreateTime(now);
+ streamEntity.setModifyTime(now);
+
+ streamMapper.insert(streamEntity);
+ }
+
+ @Override
+ public void logicDeleteDlqOrRlq(String groupId, String topicName, String operator) {
+ streamMapper.logicDeleteDlqOrRlq(groupId, topicName, operator);
+ LOGGER.info("success to logic delete dlq or rlq by groupId={}, topicName={}", groupId, topicName);
+ }
+
/**
* Update extended information
* <p/>First physically delete the existing extended information, and then add this batch of extended information
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
index 16c1058..c5315b6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Lists;
import java.util.List;
import java.util.Optional;
import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
import org.apache.inlong.manager.service.core.BusinessService;
import org.apache.inlong.manager.service.core.WorkflowApproverService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
index c3c3612..c9f7d18 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Maps;
import io.swagger.annotations.ApiModelProperty;
import java.util.Map;
import lombok.Data;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.service.workflow.BaseWorkflowFormType;
import org.apache.inlong.manager.workflow.exception.FormValidateException;
import org.apache.inlong.manager.common.util.Preconditions;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
index 3417151..a9e1cbe 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
@@ -56,7 +56,7 @@ public class ConsumptionApproveTaskListener implements TaskEventListener {
return ListenerResult.success("The consumer group name has not been modified");
}
boolean exitDuplicate = this.consumptionService
- .isConsumerGroupIdExist(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getId());
+ .isConsumerGroupIdExists(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getId());
if (exitDuplicate) {
log.error("consumerGroupId already exist! duplicate :{}", approveForm.getConsumerGroupId());
throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_NAME_DUPLICATED);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
index 605b8c1..389c4e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.enums.BizConstant;
import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
index 11cd284..4b2e75f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
@@ -24,11 +24,10 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.apache.inlong.manager.common.beans.Response;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionListVo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionSummary;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionUpdateInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionListVo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionSummary;
import org.apache.inlong.manager.common.util.LoginUserUtil;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.operationlog.OperationLog;
@@ -72,7 +71,7 @@ public class ConsumptionController {
@ApiOperation(value = "Get consumption details")
public Response<ConsumptionInfo> getDetail(
@ApiParam(value = "Consumption ID", required = true) @PathVariable(name = "id") Integer id) {
- return Response.success(consumptionService.getInfo(id));
+ return Response.success(consumptionService.get(id));
}
@DeleteMapping("/delete/{id}")
@@ -96,12 +95,11 @@ public class ConsumptionController {
@PostMapping("/update/{id}")
@OperationLog(operation = OperationType.UPDATE)
@ApiOperation(value = "Update data consumption")
- public Response<Integer> updateConsumptionInfo(
- @ApiParam(value = "Consumption ID", required = true) @PathVariable(name = "id") Integer id,
- @Validated @RequestBody ConsumptionUpdateInfo consumptionUpdateInfo) {
- consumptionUpdateInfo.setId(id);
- String currentUser = LoginUserUtil.getLoginUserDetail().getUserName();
- return Response.success(consumptionService.update(consumptionUpdateInfo, currentUser));
+ public Response<String> update(@PathVariable(name = "id") Integer id,
+ @Validated @RequestBody ConsumptionInfo consumptionInfo) {
+ consumptionInfo.setId(id);
+ consumptionService.update(consumptionInfo, LoginUserUtil.getLoginUserDetail().getUserName());
+ return Response.success();
}
@PostMapping("/startProcess/{id}")
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 16a5a30..4ca899c 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -22,8 +22,8 @@ logging.level.root=INFO
logging.level.org.apache.inlong.manager=debug
spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
-spring.datasource.druid.username=xxxxxx
-spring.datasource.druid.password=xxxxxx
+spring.datasource.druid.username=root
+spring.datasource.druid.password=fighting
spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.druid.validationQuery=SELECT 'x'
@@ -64,3 +64,5 @@ sort.appName=inlong_app
pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
# Pulsar master address
pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 5da7518..056e5a6 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -63,3 +63,5 @@ sort.appName=inlong_app
pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
# Pulsar master address
pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 16a5a30..b4a75c9 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -64,3 +64,5 @@ sort.appName=inlong_app
pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
# Pulsar master address
pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
index f085c61..8936052 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
@@ -24,17 +24,18 @@ import org.apache.inlong.manager.web.ServiceBaseTest;
import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.TestComponent;
/**
* Business Service Test
*/
+@TestComponent
public class BusinessServiceTest extends ServiceBaseTest {
@Autowired
private BusinessService businessService;
- public String saveBusiness(String operator) {
- String groupName = "test_group";
+ public String saveBusiness(String groupName, String operator) {
BusinessInfo businessInfo = new BusinessInfo();
businessInfo.setName(groupName);
businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
@@ -53,15 +54,17 @@ public class BusinessServiceTest extends ServiceBaseTest {
@Test
public void testSave() {
+ String groupName = "test_group1";
String operator = "test_user";
- String groupId = this.saveBusiness(operator);
+ String groupId = this.saveBusiness(groupName, operator);
Assert.assertNotNull(groupId);
}
@Test
public void testDelete() {
+ String groupName = "test_group2";
String operator = "test_user";
- String groupId = this.saveBusiness(operator);
+ String groupId = this.saveBusiness(groupName, operator);
boolean result = businessService.delete(groupId, operator);
Assert.assertTrue(result);
}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
new file mode 100644
index 0000000..8b74aef
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
+import org.apache.inlong.manager.web.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Consumption Service Test
+ */
+public class ConsumptionServiceTest extends ServiceBaseTest {
+
+ @Autowired
+ private ConsumptionService consumptionService;
+ @Autowired
+ private BusinessServiceTest businessServiceTest;
+
+ private Integer saveConsumption(String inlongGroup, String consumerGroup, String operator) {
+ ConsumptionInfo consumptionInfo = new ConsumptionInfo();
+ consumptionInfo.setTopic(inlongGroup);
+ consumptionInfo.setConsumerGroupName(consumerGroup);
+ consumptionInfo.setInlongGroupId("b_" + inlongGroup);
+ consumptionInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
+ consumptionInfo.setCreator(operator);
+
+ ConsumptionPulsarInfo pulsarInfo = new ConsumptionPulsarInfo();
+ pulsarInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
+ pulsarInfo.setIsDlq(1);
+ pulsarInfo.setDeadLetterTopic("test_dlq");
+ pulsarInfo.setIsRlq(0);
+
+ consumptionInfo.setMqExtInfo(pulsarInfo);
+
+ return consumptionService.save(consumptionInfo, operator);
+ }
+
+ @Test
+ public void testSave() {
+ String inlongGroup = "inlong_group1";
+ String consumerGroup = "test_save_consumer_group";
+ String operator = "test_user";
+ businessServiceTest.saveBusiness(inlongGroup, operator);
+ Integer id = this.saveConsumption(inlongGroup, consumerGroup, operator);
+ Assert.assertNotNull(id);
+ }
+
+ @Test
+ public void testDelete() {
+ String inlongGroup = "inlong_group2";
+ String operator = "test_user";
+ String consumerGroup = "test_delete_consumer_group";
+ businessServiceTest.saveBusiness(inlongGroup, operator);
+ Integer id = this.saveConsumption(inlongGroup, consumerGroup, operator);
+ boolean result = consumptionService.delete(id, operator);
+ Assert.assertTrue(result);
+ }
+
+}
diff --git a/inlong-manager/manager-web/src/test/resources/application-test.properties b/inlong-manager/manager-web/src/test/resources/application-test.properties
index 6333bd0..5053a8b 100644
--- a/inlong-manager/manager-web/src/test/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/test/resources/application-test.properties
@@ -67,3 +67,5 @@ sort.appName=inlong_app
pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
# Pulsar master address
pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
index 9b8e124..d3ae3f1 100644
--- a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
@@ -244,6 +244,25 @@ CREATE TABLE `consumption`
);
-- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+ `id` int NOT NULL AUTO_INCREMENT,
+ `consumption_id` int DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+ `consumer_group_id` varchar(255) NOT NULL COMMENT 'Consumer group ID',
+ `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+ `inlong_group_id` varchar(255) NOT NULL COMMENT 'Business group ID',
+ `is_rlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+ `retry_letter_topic` varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+ `is_dlq` tinyint(1) DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+ `dead_letter_topic` varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+ `is_deleted` int DEFAULT '0' COMMENT 'Whether to delete',
+ PRIMARY KEY (`id`)
+) COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
-- Table structure for data_proxy_cluster
-- ----------------------------
DROP TABLE IF EXISTS `data_proxy_cluster`;
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 686f253..a934118 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -52,6 +52,7 @@
<druid.version>1.2.5</druid.version>
<shiro.version>1.4.1</shiro.version>
<hive.version>3.1.2</hive.version>
+ <pulsar.version>2.8.1</pulsar.version>
<servlet-api.version>2.5</servlet-api.version>
<servlet.jsp-api.version>2.3.3</servlet.jsp-api.version>
<slf4j.version>1.7.25</slf4j.version>
@@ -195,6 +196,16 @@
<artifactId>pagehelper-spring-boot-starter</artifactId>
<version>${pagehelper.springboot.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client</artifactId>
+ <version>${pulsar.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-admin</artifactId>
+ <version>${pulsar.version}</version>
+ </dependency>
<dependency>
<groupId>com.alibaba</groupId>