You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/11/28 05:16:06 UTC

[incubator-inlong] branch master updated: [INLONG-1847][Feature][InLong-Manager] Add consumption APIs for Pulsar MQ (#1848)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 13d07a3  [INLONG-1847][Feature][InLong-Manager] Add consumption APIs for Pulsar MQ (#1848)
13d07a3 is described below

commit 13d07a30771311548ce9ce35e1aa8eb5878009ad
Author: healchow <he...@gmail.com>
AuthorDate: Sun Nov 28 13:15:58 2021 +0800

    [INLONG-1847][Feature][InLong-Manager] Add consumption APIs for Pulsar MQ (#1848)
    
    Co-authored-by: healchow <he...@gmail.com>
---
 .../docker-compose/sql/apache_inlong_manager.sql   |  20 ++
 inlong-manager/doc/sql/apache_inlong_manager.sql   |  20 ++
 inlong-manager/manager-common/pom.xml              |  27 +-
 .../inlong/manager/common/beans/ClusterBean.java   |   3 +
 .../common/conversion/ConversionHandle.java        |  52 +++
 .../ConversionStatusContext.java}                  |  22 +-
 .../ConversionStrategy.java}                       |  19 +-
 .../DaysToMinute.java}                             |  22 +-
 .../DaysToSeconds.java}                            |  22 +-
 .../GBToMB.java}                                   |  23 +-
 .../HoursToMinute.java}                            |  24 +-
 .../HoursToSeconds.java}                           |  23 +-
 .../MBToMB.java}                                   |  22 +-
 .../SecondsToSeconds.java}                         |  22 +-
 .../TBToMB.java}                                   |  22 +-
 .../inlong/manager/common/enums/BizConstant.java   |  12 +
 .../manager/common/enums/BizErrorCodeEnum.java     |   9 +-
 .../manager/common/pojo/business/BusinessInfo.java |   7 +
 .../common/pojo/business/BusinessTopicVO.java      |   4 +-
 .../ConsumptionInfo.java                           |   5 +-
 .../ConsumptionListVo.java                         |   2 +-
 .../pojo/consumption/ConsumptionMqExtBase.java     |  57 ++++
 .../pojo/consumption/ConsumptionPulsarInfo.java    |  51 +++
 .../ConsumptionQuery.java                          |   2 +-
 .../ConsumptionSummary.java                        |   2 +-
 .../pojo/dataconsumption/ConsumerProgressInfo.java | 101 ------
 .../dataconsumption/ConsumerProgressInfoQuery.java |  73 ----
 .../PulsarTopicBean.java}                          |  21 +-
 .../dao/entity/ConsumptionPulsarEntity.java}       |  25 +-
 .../manager/dao/entity/DataStreamEntity.java       |   6 +
 .../dao/mapper/ConsumptionEntityMapper.java        |   5 +-
 ...per.java => ConsumptionPulsarEntityMapper.java} |  27 +-
 .../manager/dao/mapper/DataStreamEntityMapper.java |   5 +
 .../resources/mappers/ConsumptionEntityMapper.xml  |  14 +-
 .../mappers/ConsumptionPulsarEntityMapper.xml      | 180 ++++++++++
 .../resources/mappers/DataStreamEntityMapper.xml   |   8 +
 .../manager/service/core/ConsumptionService.java   |  34 +-
 .../manager/service/core/DataStreamService.java    |  21 ++
 .../service/core/impl/BusinessServiceImpl.java     |   2 +-
 .../service/core/impl/ConsumptionServiceImpl.java  | 375 ++++++++++++++++-----
 .../service/core/impl/DataStreamServiceImpl.java   |  42 +++
 .../NewConsumptionWorkflowDefinition.java          |   2 +-
 .../newconsumption/NewConsumptionWorkflowForm.java |   2 +-
 .../listener/ConsumptionApproveTaskListener.java   |   2 +-
 .../ConsumptionCompleteProcessListener.java        |   2 +-
 .../web/controller/ConsumptionController.java      |  22 +-
 .../src/main/resources/application-dev.properties  |   6 +-
 .../src/main/resources/application-prod.properties |   2 +
 .../src/main/resources/application-test.properties |   2 +
 .../manager/service/core/BusinessServiceTest.java  |  11 +-
 .../service/core/ConsumptionServiceTest.java       |  78 +++++
 .../src/test/resources/application-test.properties |   2 +
 .../test/resources/sql/apache_inlong_manager.sql   |  19 ++
 inlong-manager/pom.xml                             |  11 +
 54 files changed, 1079 insertions(+), 515 deletions(-)

diff --git a/docker/docker-compose/sql/apache_inlong_manager.sql b/docker/docker-compose/sql/apache_inlong_manager.sql
index 6df8bdc..dec268e 100644
--- a/docker/docker-compose/sql/apache_inlong_manager.sql
+++ b/docker/docker-compose/sql/apache_inlong_manager.sql
@@ -259,6 +259,26 @@ CREATE TABLE `consumption`
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
 
 -- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+    `id`                  int          NOT NULL AUTO_INCREMENT,
+    `consumption_id`      int          DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+    `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
+    `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+    `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group ID',
+    `is_rlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+    `retry_letter_topic`  varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+    `is_dlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+    `dead_letter_topic`   varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+    `is_deleted`          int          DEFAULT '0' COMMENT 'Whether to delete',
+    PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
 -- Table structure for data_proxy_cluster
 -- ----------------------------
 DROP TABLE IF EXISTS `data_proxy_cluster`;
diff --git a/inlong-manager/doc/sql/apache_inlong_manager.sql b/inlong-manager/doc/sql/apache_inlong_manager.sql
index 6df8bdc..dec268e 100644
--- a/inlong-manager/doc/sql/apache_inlong_manager.sql
+++ b/inlong-manager/doc/sql/apache_inlong_manager.sql
@@ -259,6 +259,26 @@ CREATE TABLE `consumption`
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
 
 -- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+    `id`                  int          NOT NULL AUTO_INCREMENT,
+    `consumption_id`      int          DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+    `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
+    `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+    `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group ID',
+    `is_rlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+    `retry_letter_topic`  varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+    `is_dlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+    `dead_letter_topic`   varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+    `is_deleted`          int          DEFAULT '0' COMMENT 'Whether to delete',
+    PRIMARY KEY (`id`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
 -- Table structure for data_proxy_cluster
 -- ----------------------------
 DROP TABLE IF EXISTS `data_proxy_cluster`;
diff --git a/inlong-manager/manager-common/pom.xml b/inlong-manager/manager-common/pom.xml
index dd54e5c..3692efe 100644
--- a/inlong-manager/manager-common/pom.xml
+++ b/inlong-manager/manager-common/pom.xml
@@ -32,6 +32,16 @@
     <dependencies>
         <dependency>
             <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-validation</artifactId>
         </dependency>
         <dependency>
@@ -70,7 +80,6 @@
             <groupId>io.swagger</groupId>
             <artifactId>swagger-annotations</artifactId>
         </dependency>
-
         <dependency>
             <groupId>com.github.xiaoymin</groupId>
             <artifactId>knife4j-spring-boot-starter</artifactId>
@@ -118,25 +127,18 @@
             </exclusions>
         </dependency>
         <dependency>
-            <groupId>org.springframework</groupId>
-            <artifactId>spring-web</artifactId>
-        </dependency>
-        <dependency>
             <groupId>com.github.pagehelper</groupId>
             <artifactId>pagehelper-spring-boot-starter</artifactId>
         </dependency>
-
         <dependency>
             <groupId>javax.servlet</groupId>
             <artifactId>servlet-api</artifactId>
             <scope>provided</scope>
         </dependency>
-
         <dependency>
             <groupId>com.googlecode.json-simple</groupId>
             <artifactId>json-simple</artifactId>
         </dependency>
-
         <dependency>
             <groupId>com.google.code.gson</groupId>
             <artifactId>gson</artifactId>
@@ -154,6 +156,15 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.pulsar</groupId>
+            <artifactId>pulsar-client-admin</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.inlong</groupId>
             <artifactId>inlong-common</artifactId>
             <version>${project.version}</version>
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java
index 78185bd..bf47633 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/ClusterBean.java
@@ -49,4 +49,7 @@ public class ClusterBean {
     @Value("${pulsar.serviceUrl}")
     private String pulsarServiceUrl;
 
+    @Value("${pulsar.defaultTenant}")
+    private String defaultTenant;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionHandle.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionHandle.java
new file mode 100644
index 0000000..c34e5f1
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionHandle.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.conversion;
+
+import java.util.HashMap;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class ConversionHandle {
+
+    private static Map<String, ConversionStrategy> unitMap;
+
+    private void loadStrategy() {
+        unitMap = new HashMap<>(16);
+        unitMap.put("hours_minutes", new HoursToMinute());
+        unitMap.put("hours_seconds", new HoursToSeconds());
+        unitMap.put("gb_mb", new GBToMB());
+        unitMap.put("tb_mb", new TBToMB());
+        unitMap.put("days_seconds", new DaysToSeconds());
+        unitMap.put("days_minutes", new DaysToMinute());
+        unitMap.put("seconds_seconds", new SecondsToSeconds());
+        unitMap.put("mb_mb", new MBToMB());
+    }
+
+    public Integer handleConversion(Integer value, String type) {
+        if (unitMap == null) {
+            this.loadStrategy();
+        }
+        ConversionStrategy conversionStrategy = unitMap.get(type);
+        ConversionStatusContext conversionStatusContext = new ConversionStatusContext(conversionStrategy);
+        return conversionStatusContext.executeConversion(value);
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStatusContext.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStatusContext.java
index 613973b..f502c5e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStatusContext.java
@@ -15,22 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+public class ConversionStatusContext {
 
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
+    private final ConversionStrategy conversionStrategy;
 
-    private Integer id;
+    public ConversionStatusContext(ConversionStrategy conversionStrategy) {
+        this.conversionStrategy = conversionStrategy;
+    }
 
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+    public Integer executeConversion(Integer value) {
+        return conversionStrategy.unitConversion(value);
+    }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStrategy.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStrategy.java
index 613973b..fdd263d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/ConversionStrategy.java
@@ -15,22 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+public interface ConversionStrategy {
 
+    Integer unitConversion(Integer value);
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToMinute.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToMinute.java
index 613973b..658bb1c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToMinute.java
@@ -15,22 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+public class DaysToMinute implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value * 24 * 60;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToSeconds.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToSeconds.java
index 613973b..a234474 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/DaysToSeconds.java
@@ -15,22 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+public class DaysToSeconds implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value * 24 * 60 * 60;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/GBToMB.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/GBToMB.java
index 613973b..c513e54 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/GBToMB.java
@@ -15,22 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import org.springframework.stereotype.Component;
 
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+@Component
+public class GBToMB implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value << 10;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToMinute.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToMinute.java
index 613973b..4d76e4e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToMinute.java
@@ -15,22 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import org.springframework.stereotype.Component;
 
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+@Component
+public class HoursToMinute implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value * 60;
+    }
 }
+
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToSeconds.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToSeconds.java
index 613973b..781d069 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/HoursToSeconds.java
@@ -15,22 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
+import org.springframework.stereotype.Component;
 
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+@Component
+public class HoursToSeconds implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value * 60 * 60;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/MBToMB.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/MBToMB.java
index 613973b..4a5c3f9 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/MBToMB.java
@@ -15,22 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+public class MBToMB implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/SecondsToSeconds.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/SecondsToSeconds.java
index 613973b..75c95da 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/SecondsToSeconds.java
@@ -15,22 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+public class SecondsToSeconds implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/TBToMB.java
similarity index 66%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/TBToMB.java
index 613973b..c7f527e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/conversion/TBToMB.java
@@ -15,22 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.conversion;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Data consumption update info
- */
-@Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
-
-    private Integer id;
-
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
+public class TBToMB implements ConversionStrategy {
 
+    @Override
+    public Integer unitConversion(Integer value) {
+        return value << 20;
+    }
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
index a5589b0..c2c322d 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizConstant.java
@@ -40,8 +40,20 @@ public class BizConstant {
 
     public static final String SCHEMA_M0_DAY = "m0_day";
 
+    public static final String CLUSTER_HIVE_TOPO = "HIVE_TOPO";
+
     public static final String GROUP_ID_IS_EMPTY = "business group id is empty";
 
     public static final String STREAM_ID_IS_EMPTY = "data stream id is empty";
 
+    public static final String PULSAR_TOPIC_TYPE_SERIAL = "SERIAL";
+
+    public static final String PULSAR_TOPIC_TYPE_PARALLEL = "PARALLEL";
+
+    public static final String SYSTEM_USER = "SYSTEM"; // system user
+
+    public static final String PREFIX_DLQ = "dlq"; // prefix of the Topic of the dead letter queue
+
+    public static final String PREFIX_RLQ = "rlq"; // prefix of the Topic of the retry letter queue
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
index 32624a4..b7a2290 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/BizErrorCodeEnum.java
@@ -80,7 +80,14 @@ public enum BizErrorCodeEnum {
     WORKFLOW_EXE_FAILED(4000, "Workflow execution exception"),
 
     CONSUMER_GROUP_NAME_DUPLICATED(2600, "The consumer group already exists in the cluster"),
-    CONSUMER_GROUP_CREATE_FAILED(5001, "Failed to create tube consumer group"),
+    CONSUMER_GROUP_CREATE_FAILED(2601, "Failed to create tube consumer group"),
+    TUBE_GROUP_CREATE_FAILED(2602, "Create Tube consumer group failed"),
+    PULSAR_GROUP_CREATE_FAILED(2603, "Create Pulsar consumer group failed"),
+    TUBE_TOPIC_CREATE_FAILED(2604, "CreateTube Topic failed"),
+    PULSAR_TOPIC_CREATE_FAILED(2605, "Create Pulsar Topic failed"),
+    PULSAR_DLQ_RLQ_ERROR(2606, "Wrong config for the RLQ and DLQ: RLQ was enabled, but the DLQ was disabled"),
+    PULSAR_DLQ_DUPLICATED(2607, "DLQ topic already exists under the business"),
+    PULSAR_RLQ_DUPLICATED(2608, "RLQ topic already exists under the business"),
 
     COMMON_FILE_DOWNLOAD_FAIL(6001, "File download failed"),
     COMMON_FILE_UPLOAD_FAIL(6002, "File upload failed"),
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
index db87278..eb2906b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessInfo.java
@@ -67,6 +67,13 @@ public class BusinessInfo {
     @ApiModelProperty(value = "Pulsar service URL")
     private String pulsarServiceUrl;
 
+    @ApiModelProperty(value = "Queue model of Pulsar, parallel: multiple partitions, high throughput, out-of-order "
+            + "messages; serial: single partition, low throughput, and orderly messages")
+    private String queueModule = "parallel";
+
+    @ApiModelProperty(value = "The number of partitions of Pulsar Topic, 1-20")
+    private Integer topicPartitionNum = 3;
+
     @ApiModelProperty(value = "Data type name")
     private String schemaName;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java
index 94553a9..6ed1d26 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/business/BusinessTopicVO.java
@@ -36,8 +36,8 @@ public class BusinessTopicVO {
     @ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
     private String middlewareType;
 
-    @ApiModelProperty(value = "Tube topic name")
-    private String topicName;
+    @ApiModelProperty(value = "Tube topic name, or Pulsar namespace name")
+    private String mqResourceObj;
 
     @ApiModelProperty(value = "Topic list, Tube corresponds to business group, there is only 1 topic, "
             + "Pulsar corresponds to data stream, there are multiple topics")
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
similarity index 95%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionInfo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
index 2834050..cebdcfb 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionInfo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import io.swagger.annotations.ApiModel;
@@ -87,6 +87,9 @@ public class ConsumptionInfo {
 
     private Date modifyTime;
 
+    @ApiModelProperty(value = "Extended information for MQ")
+    private ConsumptionMqExtBase mqExtInfo;
+
     @JsonIgnore
     @AssertTrue(message = "when filter enabled, data stream id cannot be null")
     public boolean isValidateFilter() {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionListVo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionListVo.java
similarity index 97%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionListVo.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionListVo.java
index 9ac259c..02854a7 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionListVo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionListVo.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
new file mode 100644
index 0000000..287bcd6
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.consumption;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.BizConstant;
+
+/**
+ * Extended consumption information of different MQs
+ */
+@Data
+@ApiModel("Extended consumption information of different MQs")
+@JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class)
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = BizConstant.MIDDLEWARE_PULSAR)
+})
+public class ConsumptionMqExtBase {
+
+    @ApiModelProperty(value = "Self-incrementing primary key")
+    private Integer id;
+
+    @ApiModelProperty(value = "Consumer information ID")
+    private Integer consumptionId;
+
+    @ApiModelProperty(value = "Consumer group")
+    private String consumerGroup;
+
+    @ApiModelProperty(value = "Consumption target inlong group id")
+    private String inlongGroupId;
+
+    @ApiModelProperty("Whether to delete, 0: not deleted, 1: deleted")
+    private Integer isDeleted = 0;
+
+    @ApiModelProperty(value = "Data storage middleware type, high throughput: TUBE, high consistency: PULSAR")
+    private String middlewareType;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
new file mode 100644
index 0000000..350fef4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.consumption;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.BizConstant;
+
+/**
+ * Pulsar consumer information
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString(callSuper = true)
+@ApiModel("Pulsar consumer information")
+public class ConsumptionPulsarInfo extends ConsumptionMqExtBase {
+
+    @ApiModelProperty("The middleware type of MQ")
+    private String middlewareType = BizConstant.MIDDLEWARE_PULSAR;
+
+    @ApiModelProperty("Whether to configure the dead letter queue, 0: do not configure, 1: configure")
+    private Integer isDlq;
+
+    @ApiModelProperty("The name of the dead letter queue Topic")
+    private String deadLetterTopic;
+
+    @ApiModelProperty("Whether to configure the retry letter queue, 0: do not configure, 1: configure")
+    private Integer isRlq;
+
+    @ApiModelProperty("The name of the retry letter queue topic")
+    private String retryLetterTopic;
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionQuery.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionQuery.java
similarity index 97%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionQuery.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionQuery.java
index b786125..e3670c3 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionQuery.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionQuery.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSummary.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionSummary.java
similarity index 96%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSummary.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionSummary.java
index 9be2820..41f4038 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSummary.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionSummary.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.consumption;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfo.java
deleted file mode 100644
index 9aaf3b6..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.dataconsumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * Data consumption progress
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption progress")
-public class ConsumerProgressInfo {
-
-    @ApiModelProperty("Report time")
-    private Date repTime;
-
-    @ApiModelProperty("Report date")
-    private Long repDate;
-
-    @ApiModelProperty("Consumer group")
-    private String consumerGroup;
-
-    @ApiModelProperty("Consumption TOPIC")
-    private String topic;
-
-    @ApiModelProperty("Broker IP")
-    private String brokerIp;
-
-    @ApiModelProperty("Broker Id")
-    private Long brokerId;
-
-    @ApiModelProperty("Partition ID")
-    private Long part;
-
-    @ApiModelProperty("Consumption ID")
-    private String consumerId;
-
-    @ApiModelProperty("Consumer IP")
-    private String consumerIp;
-
-    @ApiModelProperty("Consumer process ID")
-    private String consumerPid;
-
-    @ApiModelProperty("Is the heartbeat normal")
-    private String isHeartbeatOk;
-
-    @ApiModelProperty("heartbeat time")
-    private String hartbeat;
-
-    @ApiModelProperty("Is the storage normal")
-    private String isStoreOk;
-
-    @ApiModelProperty("Consumption difference")
-    private Double fileConSize;
-
-    @ApiModelProperty("Production speed")
-    private Double fileSpeed;
-
-    @ApiModelProperty("Consumption speed")
-    private Double consumeSpeed;
-
-    @ApiModelProperty("Consumption difference speed")
-    private Double fileConSpeed;
-
-    @ApiModelProperty("Status: 0 normal; 1 abnormal; 2 shielded; 3 not yet available")
-    private String state;
-
-    @ApiModelProperty("Middleware type")
-    private String middleware;
-
-    private Double tmpOffset;
-    private Double minOffset;
-    private Double fileOffset;
-    private Double consumerOffset;
-    private Double saveFileSize;
-    private Integer intervalSec;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfoQuery.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfoQuery.java
deleted file mode 100644
index 5707233..0000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumerProgressInfoQuery.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.dataconsumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.common.beans.PageRequest;
-import org.springframework.format.annotation.DateTimeFormat;
-
-/**
- * Consumption progress query
- */
-@EqualsAndHashCode(callSuper = false)
-@Data
-@ApiModel("Consumption progress query")
-public class ConsumerProgressInfoQuery extends PageRequest {
-
-    @ApiModelProperty(hidden = true, value = "Consumption ID")
-    private Integer consumerId;
-
-    @ApiModelProperty(value = "Consumer Group ID", required = true)
-    @NotBlank(message = "consumerGroupId can't be null")
-    private String consumerGroupId;
-
-    @ApiModelProperty(value = "Reporting time 5 minutes interval: yyyy-MM-dd HH:mm", required = true)
-    @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm")
-    @NotNull(message = "repTime can't be null")
-    private Date repTime;
-
-    @ApiModelProperty(value = "Report date: yyyyMMdd")
-    private Long repDate;
-
-    @ApiModelProperty(value = "Consumption TOPIC")
-    private String topic;
-
-    @ApiModelProperty(value = "Broker Ip")
-    private String brokerIp;
-
-    @ApiModelProperty(value = "Consumer Group IP")
-    private String consumerIp;
-
-    @ApiModelProperty(value = "Is the heartbeat normal")
-    private String isHeartbeatOk;
-
-    @ApiModelProperty(value = "Is the storage normal")
-    private String isStoreOk;
-
-    @ApiModelProperty(value = "state")
-    private String state;
-
-    @ApiModelProperty(value = "Middleware Type")
-    private String middleware;
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSaveResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/pulsar/PulsarTopicBean.java
similarity index 69%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSaveResponse.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/pulsar/PulsarTopicBean.java
index 90e7811..15e8407 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionSaveResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/pulsar/PulsarTopicBean.java
@@ -15,26 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.common.pojo.pulsar;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
-/**
- * Response after data consumption information is successfully saved
- */
 @Data
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-@ApiModel("Response after data consumption information is successfully saved")
-public class ConsumptionSaveResponse {
+public class PulsarTopicBean {
+
+    private String tenant;
+
+    private String namespace;
+
+    private String topicName;
+
+    private String queueModule;
 
-    @ApiModelProperty(value = "Data consumption ID")
-    private Integer id;
+    private Integer numPartitions = 0;
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
similarity index 65%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
rename to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
index 613973b..675126b 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/dataconsumption/ConsumptionUpdateInfo.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
@@ -15,22 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.pojo.dataconsumption;
+package org.apache.inlong.manager.dao.entity;
 
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
+import java.io.Serializable;
 import lombok.Data;
 
-/**
- * Data consumption update info
- */
 @Data
-@ApiModel("Data consumption update info")
-public class ConsumptionUpdateInfo {
+public class ConsumptionPulsarEntity implements Serializable {
 
+    private static final long serialVersionUID = 1L;
     private Integer id;
+    private Integer consumptionId;
+    private String consumerGroupId;
+    private String inlongGroupId;
+    private Integer isDlq;
+    private String deadLetterTopic;
+    private Integer isRlq;
+    private String retryLetterTopic;
+    private Integer isDeleted;
 
-    @ApiModelProperty(value = "consumption in charges")
-    private String inCharges;
-
-}
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
index 5b9d90b..15cd115 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/DataStreamEntity.java
@@ -38,6 +38,12 @@ public class DataStreamEntity implements Serializable {
     private String fileDelimiter;
     private Integer havePredefinedFields;
     private String inCharges;
+
+    private Integer dailyRecords;
+    private Integer dailyStorage;
+    private Integer peakRecords;
+    private Integer maxLength;
+
     private Integer status;
     private Integer previousStatus;
     private Integer isDeleted;
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
index 634548b..95990fa 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.dao.mapper;
 
 import java.util.List;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.workflow.model.view.CountByKey;
 import org.springframework.stereotype.Repository;
@@ -34,6 +34,8 @@ public interface ConsumptionEntityMapper {
 
     ConsumptionEntity selectByPrimaryKey(Integer id);
 
+    ConsumptionEntity selectConsumptionExists(String groupId, String topic, String consumerGroup);
+
     int updateByPrimaryKeySelective(ConsumptionEntity record);
 
     int updateByPrimaryKey(ConsumptionEntity record);
@@ -41,4 +43,5 @@ public interface ConsumptionEntityMapper {
     List<ConsumptionEntity> listByQuery(ConsumptionQuery consumptionQuery);
 
     List<CountByKey> countByStatus(ConsumptionQuery consumptionQuery);
+
 }
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
similarity index 55%
copy from inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
index 634548b..99f65d2 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
@@ -17,28 +17,27 @@
 
 package org.apache.inlong.manager.dao.mapper;
 
-import java.util.List;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.workflow.model.view.CountByKey;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
 import org.springframework.stereotype.Repository;
 
 @Repository
-public interface ConsumptionEntityMapper {
+public interface ConsumptionPulsarEntityMapper {
 
-    int deleteByPrimaryKey(Integer id);
+    int deleteByConsumptionId(@Param("consumptionId") Integer consumptionId);
 
-    int insert(ConsumptionEntity record);
+    int insert(ConsumptionPulsarEntity record);
 
-    int insertSelective(ConsumptionEntity record);
+    int insertSelective(ConsumptionPulsarEntity record);
 
-    ConsumptionEntity selectByPrimaryKey(Integer id);
+    ConsumptionPulsarEntity selectByPrimaryKey(@Param("id") Integer id);
 
-    int updateByPrimaryKeySelective(ConsumptionEntity record);
+    ConsumptionPulsarEntity selectByConsumptionId(@Param("consumptionId") Integer consumptionId);
 
-    int updateByPrimaryKey(ConsumptionEntity record);
+    int updateByPrimaryKeySelective(ConsumptionPulsarEntity record);
 
-    List<ConsumptionEntity> listByQuery(ConsumptionQuery consumptionQuery);
+    int updateByConsumptionId(ConsumptionPulsarEntity record);
 
-    List<CountByKey> countByStatus(ConsumptionQuery consumptionQuery);
-}
+    int updateByPrimaryKey(ConsumptionPulsarEntity record);
+
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
index 257b085..c1c988d 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/DataStreamEntityMapper.java
@@ -85,4 +85,9 @@ public interface DataStreamEntityMapper {
     int updateStatusByIdentifier(@Param("groupId") String groupId, @Param("streamId") String streamId,
             @Param("status") Integer status, @Param("modifier") String modifier);
 
+    /**
+     * Logic delete dlq or rlq topic by bid
+     */
+    void logicDeleteDlqOrRlq(String groupId, String streamId, String operator);
+
 }
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
index 51f150c..7c77590 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
@@ -50,6 +50,16 @@
         from consumption
         where id = #{id,jdbcType=INTEGER} and is_deleted = 0
     </select>
+    <select id="selectConsumptionExists" resultType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from consumption
+        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+        and topic = #{topic, jdbcType=VARCHAR}
+        and consumer_group_id = #{consumerGroup, jdbcType=VARCHAR}
+        and is_deleted = 0
+    </select>
+
     <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
         update consumption
         set is_deleted=id
@@ -237,7 +247,7 @@
     </update>
 
     <select id="listByQuery"
-            parameterType="org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery"
+            parameterType="org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery"
             resultMap="BaseResultMap">
         select
         c.*
@@ -309,7 +319,7 @@
     </select>
 
     <select id="countByStatus"
-            parameterType="org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery"
+            parameterType="org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery"
             resultType="org.apache.inlong.manager.workflow.model.view.CountByKey">
         select
         status as `key`,count(1) as value
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
new file mode 100644
index 0000000..ed44894
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
@@ -0,0 +1,180 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper">
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="consumption_id" jdbcType="VARCHAR" property="consumptionId"/>
+        <result column="consumer_group_id" jdbcType="VARCHAR" property="consumerGroupId"/>
+        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="is_rlq" jdbcType="INTEGER" property="isRlq"/>
+        <result column="retry_letter_topic" jdbcType="VARCHAR" property="retryLetterTopic"/>
+        <result column="is_dlq" jdbcType="INTEGER" property="isDlq"/>
+        <result column="dead_letter_topic" jdbcType="VARCHAR" property="deadLetterTopic"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+    </resultMap>
+    <sql id="Base_Column_List">
+        id, consumption_id, consumer_group_id, inlong_group_id, is_rlq, retry_letter_topic,
+        is_dlq, dead_letter_topic, is_deleted
+    </sql>
+    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from consumption_pulsar
+        where id = #{id,jdbcType=INTEGER}
+    </select>
+    <select id="selectByConsumptionId" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from consumption_pulsar
+        where consumption_id = #{consumptionId, jdbcType=INTEGER}
+        and is_deleted = 0
+    </select>
+
+    <delete id="deleteByConsumptionId">
+        update consumption_pulsar
+        set is_deleted = 1
+        where consumption_id = #{consumptionId, jdbcType=INTEGER}
+          and is_deleted = 0
+    </delete>
+
+    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+        insert into consumption_pulsar (id, consumption_id, consumer_group_id,
+                                        inlong_group_id, is_rlq, retry_letter_topic,
+                                        is_dlq, dead_letter_topic, is_deleted)
+        values (#{id,jdbcType=INTEGER}, #{consumptionId,jdbcType=INTEGER}, #{consumerGroupId,jdbcType=VARCHAR},
+                #{inlongGroupId,jdbcType=VARCHAR}, #{isRlq,jdbcType=INTEGER}, #{retryLetterTopic,jdbcType=VARCHAR},
+                #{isDlq,jdbcType=INTEGER}, #{deadLetterTopic,jdbcType=VARCHAR}, #{isDeleted,jdbcType=INTEGER})
+    </insert>
+    <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+        insert into consumption_pulsar
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="id != null">
+                id,
+            </if>
+            <if test="consumptionId != null">
+                consumption_id,
+            </if>
+            <if test="consumerGroupId != null">
+                consumer_group_id,
+            </if>
+            <if test="inlongGroupId != null">
+                inlong_group_id,
+            </if>
+            <if test="isRlq != null">
+                is_rlq,
+            </if>
+            <if test="retryLetterTopic != null">
+                retry_letter_topic,
+            </if>
+            <if test="isDlq != null">
+                is_dlq,
+            </if>
+            <if test="deadLetterTopic != null">
+                dead_letter_topic,
+            </if>
+            <if test="isDeleted != null">
+                is_deleted,
+            </if>
+        </trim>
+        <trim prefix="values (" suffix=")" suffixOverrides=",">
+            <if test="id != null">
+                #{id,jdbcType=INTEGER},
+            </if>
+            <if test="consumerGroupId != null">
+                #{consumerGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="inlongGroupId != null">
+                #{inlongGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="isRlq != null">
+                #{isRlq,jdbcType=INTEGER},
+            </if>
+            <if test="retryLetterTopic != null">
+                #{retryLetterTopic,jdbcType=VARCHAR},
+            </if>
+            <if test="isDlq != null">
+                #{isDlq,jdbcType=INTEGER},
+            </if>
+            <if test="deadLetterTopic != null">
+                #{deadLetterTopic,jdbcType=VARCHAR},
+            </if>
+            <if test="isDeleted != null">
+                #{isDeleted,jdbcType=INTEGER},
+            </if>
+        </trim>
+    </insert>
+    <update id="updateByPrimaryKeySelective"
+            parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+        update consumption_pulsar
+        <set>
+            <if test="consumptionId != null">
+                consumption_id = #{consumptionId,jdbcType=INTEGER},
+            </if>
+            <if test="consumerGroupId != null">
+                consumer_group_id = #{consumerGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="inlongGroupId != null">
+                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+            </if>
+            <if test="isRlq != null">
+                is_rlq = #{isRlq,jdbcType=INTEGER},
+            </if>
+            <if test="retryLetterTopic != null">
+                retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
+            </if>
+            <if test="isDlq != null">
+                is_dlq = #{isDlq,jdbcType=INTEGER},
+            </if>
+            <if test="deadLetterTopic != null">
+                dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
+            </if>
+            <if test="isDeleted != null">
+                is_deleted = #{isDeleted,jdbcType=INTEGER},
+            </if>
+        </set>
+        where id = #{id,jdbcType=INTEGER}
+    </update>
+    <update id="updateByConsumptionId">
+        update consumption_pulsar
+        <set>
+            consumer_group_id = #{consumerGroupId,jdbcType=VARCHAR},
+            inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
+            is_rlq = #{isRlq,jdbcType=INTEGER},
+            retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
+            is_dlq = #{isDlq,jdbcType=INTEGER},
+            dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
+            is_deleted = #{isDeleted,jdbcType=INTEGER},
+        </set>
+        where consumption_id = #{consumptionId,jdbcType=INTEGER}
+    </update>
+    <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
+        update consumption_pulsar
+        set consumption_id     = #{consumptionId,jdbcType=INTEGER},
+            consumer_group_id  = #{consumerGroupId,jdbcType=VARCHAR},
+            inlong_group_id    = #{inlongGroupId,jdbcType=VARCHAR},
+            is_rlq             = #{isRlq,jdbcType=INTEGER},
+            retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
+            is_dlq             = #{isDlq,jdbcType=INTEGER},
+            dead_letter_topic  = #{deadLetterTopic,jdbcType=VARCHAR},
+            is_deleted         = #{isDeleted,jdbcType=INTEGER}
+        where id = #{id,jdbcType=INTEGER}
+    </update>
+</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
index 759a0ac..16b9d7e 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/DataStreamEntityMapper.xml
@@ -509,6 +509,14 @@
             and is_deleted = 0
         </where>
     </update>
+    <update id="logicDeleteDlqOrRlq">
+        update data_stream
+        set is_deleted = 1,
+            modifier   = #{operator, jdbcType=VARCHAR}
+        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
+          and inlong_stream_id = #{streamId, jdbcType=VARCHAR}
+          and is_deleted = 0
+    </update>
 
     <select id="selectStreamToHiveInfo" resultMap="dataStreamFullInfo">
         SELECT h.id,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
index a25042d..64b1af1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
@@ -18,11 +18,11 @@
 package org.apache.inlong.manager.service.core;
 
 import com.github.pagehelper.PageInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionListVo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionSummary;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionUpdateInfo;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionListVo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionSummary;
 import org.apache.inlong.manager.service.workflow.WorkflowResult;
 
 /**
@@ -52,15 +52,7 @@ public interface ConsumptionService {
      * @param id Consumer ID
      * @return Details
      */
-    ConsumptionInfo getInfo(Integer id);
-
-    /**
-     * According to data consumption details
-     *
-     * @param consumerGroupId Consumer group ID
-     * @return Consumption info
-     */
-    ConsumptionInfo getInfo(String consumerGroupId);
+    ConsumptionInfo get(Integer id);
 
     /**
      * Determine whether the Consumer group ID already exists
@@ -69,7 +61,7 @@ public interface ConsumptionService {
      * @param excludeSelfId Exclude the ID of this record
      * @return does it exist
      */
-    boolean isConsumerGroupIdExist(String consumerGroupId, Integer excludeSelfId);
+    boolean isConsumerGroupIdExists(String consumerGroupId, Integer excludeSelfId);
 
     /**
      * Save basic data consumption information
@@ -83,11 +75,10 @@ public interface ConsumptionService {
     /**
      * Update the person in charge of data consumption, etc
      *
-     * @param consumptionUpdateInfo Update information
+     * @param consumptionInfo consumption information
      * @param operator Operator
-     * @return Updated id
      */
-    Integer update(ConsumptionUpdateInfo consumptionUpdateInfo, String operator);
+    Boolean update(ConsumptionInfo consumptionInfo, String operator);
 
     /**
      * Delete data consumption
@@ -95,7 +86,7 @@ public interface ConsumptionService {
      * @param id Consumer ID
      * @param operator Operator
      */
-    void delete(Integer id, String operator);
+    Boolean delete(Integer id, String operator);
 
     /**
      * Start the application process
@@ -106,4 +97,9 @@ public interface ConsumptionService {
      */
     WorkflowResult startProcess(Integer id, String operator);
 
+    /**
+     * Save the consumer group info for Sort to the database
+     */
+    void saveSortConsumption(BusinessInfo bizInfo, String topic, String consumerGroup);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
index f23a722..75b463b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataStreamService.java
@@ -55,6 +55,15 @@ public interface DataStreamService {
     DataStreamInfo get(String groupId, String streamId);
 
     /**
+     * Query whether the data stream ID exists
+     *
+     * @param groupId business group id
+     * @param streamId data stream id
+     * @return true: exists, false: does not exist
+     */
+    Boolean exist(String groupId, String streamId);
+
+    /**
      * Query data stream list based on conditions
      *
      * @param request Data stream paging query request
@@ -188,4 +197,16 @@ public interface DataStreamService {
      */
     boolean updateStatus(String groupId, String streamId, Integer status, String operator);
 
+    /**
+     * According to the specified DLQ / RLQ name, create the corresponding Pulsar's Topic stream
+     *
+     * @param topicName Pulsar's Topic name, which is the data stream ID
+     */
+    void insertDlqOrRlq(String bid, String topicName, String operator);
+
+    /**
+     * Logic delete dlq or rlq topic by bid
+     */
+    void logicDeleteDlqOrRlq(String bid, String topicName, String operator);
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
index 1776843..152125a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/BusinessServiceImpl.java
@@ -383,7 +383,7 @@ public class BusinessServiceImpl implements BusinessService {
 
         if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
             // Tube Topic corresponds to business one-to-one
-            topicVO.setTopicName(businessInfo.getMqResourceObj());
+            topicVO.setMqResourceObj(businessInfo.getMqResourceObj());
             topicVO.setTubeMasterUrl(clusterBean.getTubeMaster());
         } else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
             // Pulsar's topic corresponds to the data stream one-to-one
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 388ea01..a37a771 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -20,27 +20,42 @@ package org.apache.inlong.manager.service.core.impl;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
+import java.util.Arrays;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
 import org.apache.inlong.manager.common.pojo.business.BusinessTopicVO;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionListVo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionSummary;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionUpdateInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionListVo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionMqExtBase;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionSummary;
+import org.apache.inlong.manager.common.pojo.datastream.DataStreamTopicVO;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.BusinessEntity;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
+import org.apache.inlong.manager.dao.mapper.BusinessEntityMapper;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.core.DataStreamService;
 import org.apache.inlong.manager.service.workflow.ProcessName;
 import org.apache.inlong.manager.service.workflow.WorkflowResult;
 import org.apache.inlong.manager.service.workflow.WorkflowService;
@@ -59,13 +74,19 @@ import org.springframework.util.CollectionUtils;
 public class ConsumptionServiceImpl implements ConsumptionService {
 
     @Autowired
+    private ClusterBean clusterBean;
+    @Autowired
+    private BusinessEntityMapper businessMapper;
+    @Autowired
     private ConsumptionEntityMapper consumptionMapper;
     @Autowired
+    private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
+    @Autowired
     private WorkflowService workflowService;
     @Autowired
     private BusinessService businessService;
     @Autowired
-    private ClusterBean clusterBean;
+    private DataStreamService streamService;
 
     @Override
     public ConsumptionSummary getSummary(ConsumptionQuery query) {
@@ -83,7 +104,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
     @Override
     public PageInfo<ConsumptionListVo> list(ConsumptionQuery query) {
         PageHelper.startPage(query.getPageNum(), query.getPageSize());
-        Page<ConsumptionEntity> pageResult = (Page<ConsumptionEntity>) this.consumptionMapper.listByQuery(query);
+        Page<ConsumptionEntity> pageResult = (Page<ConsumptionEntity>) consumptionMapper.listByQuery(query);
         PageInfo<ConsumptionListVo> pageInfo = pageResult
                 .toPageInfo(entity -> CommonBeanUtils.copyProperties(entity, ConsumptionListVo::new));
         pageInfo.setTotal(pageResult.getTotal());
@@ -91,92 +112,208 @@ public class ConsumptionServiceImpl implements ConsumptionService {
     }
 
     @Override
-    public ConsumptionInfo getInfo(Integer id) {
-        Preconditions.checkNotNull(id, "Consumption id can't be null");
+    public ConsumptionInfo get(Integer id) {
+        Preconditions.checkNotNull(id, "consumption id cannot be null");
         ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(id);
-        Preconditions.checkNotNull(entity, () -> "Consumption not exist with id:" + id);
+        Preconditions.checkNotNull(entity, "consumption not exist with id:" + id);
 
-        ConsumptionInfo consumptionInfo = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
-        consumptionInfo.setMasterUrl(clusterBean.getTubeMaster());
+        ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
 
-        return consumptionInfo;
-    }
+        if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+            ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
+            Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
+            ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
+            info.setMqExtInfo(pulsarInfo);
 
-    @Override
-    public ConsumptionInfo getInfo(String consumerGroupId) {
-        Preconditions.checkNotEmpty(consumerGroupId, "ConsumerGroupId can't be null");
-        ConsumptionQuery consumptionQuery = new ConsumptionQuery();
-        consumptionQuery.setConsumerGroupId(consumerGroupId);
-        List<ConsumptionEntity> result = consumptionMapper.listByQuery(consumptionQuery);
-        if (CollectionUtils.isEmpty(result)) {
-            return null;
+            info.setTopic(this.getFullPulsarTopic(info.getInlongGroupId(), info.getTopic()));
         }
 
-        ConsumptionEntity entity = result.get(0);
-        ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
-        info.setMasterUrl(clusterBean.getTubeMaster());
-
         return info;
     }
 
     @Override
-    public boolean isConsumerGroupIdExist(String groupId, Integer excludeSelfId) {
+    public boolean isConsumerGroupIdExists(String consumerGroup, Integer excludeSelfId) {
         ConsumptionQuery consumptionQuery = new ConsumptionQuery();
-        consumptionQuery.setConsumerGroupId(groupId);
+        consumptionQuery.setConsumerGroupId(consumerGroup);
         List<ConsumptionEntity> result = consumptionMapper.listByQuery(consumptionQuery);
         if (excludeSelfId != null) {
-            result = result.stream()
-                    .filter(entity -> !excludeSelfId.equals(entity.getId()))
+            result = result.stream().filter(consumer -> !excludeSelfId.equals(consumer.getId()))
                     .collect(Collectors.toList());
         }
         return !CollectionUtils.isEmpty(result);
     }
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
-    public Integer save(ConsumptionInfo consumptionInfo, String operator) {
-        checkConsumptionInfo(consumptionInfo);
-        ConsumptionEntity consumptionEntity = saveConsumption(consumptionInfo, operator);
+    @Transactional(rollbackFor = Throwable.class)
+    public Integer save(ConsumptionInfo info, String operator) {
+        fullConsumptionInfo(info);
+
+        Date now = new Date();
+        ConsumptionEntity entity = this.saveConsumption(info, operator, now);
+
+        if (BizConstant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
+            savePulsarInfo(info.getMqExtInfo(), entity);
+        }
+
+        return entity.getId();
+    }
+
+    /**
+     * Save Pulsar consumption info
+     */
+    private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity entity) {
+        Preconditions.checkNotNull(mqExtBase, "Pulsar info cannot be empty, as the middleware is Pulsar");
+        ConsumptionPulsarInfo pulsarInfo = (ConsumptionPulsarInfo) mqExtBase;
+
+        // Prerequisite for RLQ to be turned on: DLQ must be turned on
+        boolean dlqEnable = (pulsarInfo.getIsDlq() != null && pulsarInfo.getIsDlq() == 1);
+        boolean rlqEnable = (pulsarInfo.getIsRlq() != null && pulsarInfo.getIsRlq() == 1);
+        if (rlqEnable && !dlqEnable) {
+            throw new BusinessException(BizErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
+        }
+
+        // When saving, the DLQ / RLQ under the same groupId cannot be repeated;
+        // when closing, delete the related configuration
+        String groupId = entity.getInlongGroupId();
+        if (dlqEnable) {
+            String dlqTopic = BizConstant.PREFIX_DLQ + "_" + pulsarInfo.getDeadLetterTopic();
+            Boolean exist = streamService.exist(groupId, dlqTopic);
+            if (exist) {
+                throw new BusinessException(BizErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
+            }
+        } else {
+            pulsarInfo.setIsDlq(0);
+            pulsarInfo.setDeadLetterTopic(null);
+        }
+        if (rlqEnable) {
+            String rlqTopic = BizConstant.PREFIX_RLQ + "_" + pulsarInfo.getRetryLetterTopic();
+            Boolean exist = streamService.exist(groupId, rlqTopic);
+            if (exist) {
+                throw new BusinessException(BizErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
+            }
+        } else {
+            pulsarInfo.setIsRlq(0);
+            pulsarInfo.setRetryLetterTopic(null);
+        }
 
-        log.debug("success to save consumption {}", consumptionInfo);
-        return consumptionEntity.getId();
+        ConsumptionPulsarEntity pulsar = CommonBeanUtils.copyProperties(pulsarInfo, ConsumptionPulsarEntity::new);
+        Integer consumptionId = entity.getId();
+        pulsar.setConsumptionId(consumptionId);
+        pulsar.setInlongGroupId(groupId);
+        pulsar.setConsumerGroupId(entity.getConsumerGroupId());
+        pulsar.setIsDeleted(0);
+
+        // Pulsar consumer information may already exist, update if it exists, add if it does not exist
+        ConsumptionPulsarEntity exists = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
+        if (exists == null) {
+            consumptionPulsarMapper.insert(pulsar);
+        } else {
+            pulsar.setId(exists.getId());
+            consumptionPulsarMapper.updateByPrimaryKey(pulsar);
+        }
     }
 
     @Override
-    @Transactional(rollbackFor = Exception.class)
-    public Integer update(ConsumptionUpdateInfo info, String operator) {
-        Preconditions.checkNotNull(info, "update info can't be null");
-        Preconditions.checkNotNull(info.getId(), "consumption id can't be null");
-        ConsumptionInfo consumptionInfo = this.getInfo(info.getId());
-        Preconditions.checkNotNull(consumptionInfo, "consumption not exist with id " + info.getId());
-        Preconditions.checkTrue(consumptionInfo.getInCharges().contains(operator),
-                "operator is not the owner for this consumption");
+    @Transactional(rollbackFor = Throwable.class)
+    public Boolean update(ConsumptionInfo info, String operator) {
+        Preconditions.checkNotNull(info, "consumption info cannot be null");
+        Integer consumptionId = info.getId();
+        Preconditions.checkNotNull(consumptionId, "consumption id cannot be null");
+
+        ConsumptionEntity exists = consumptionMapper.selectByPrimaryKey(consumptionId);
+        Preconditions.checkNotNull(exists, "consumption not exist with id " + consumptionId);
+        Preconditions.checkTrue(exists.getInCharges().contains(operator),
+                "operator" + operator + " has no privilege for the consumption");
 
         ConsumptionEntity entity = new ConsumptionEntity();
-        entity.setId(info.getId());
-        entity.setInCharges(info.getInCharges());
+        Date now = new Date();
+        CommonBeanUtils.copyProperties(info, entity, true);
         entity.setModifier(operator);
-        entity.setModifyTime(new Date());
+        entity.setModifyTime(now);
+
+        // Modify Pulsar consumption info
+        if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+            ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
+            Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
+            pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
+
+            // Whether DLQ / RLQ is turned on or off
+            ConsumptionPulsarInfo update = (ConsumptionPulsarInfo) info.getMqExtInfo();
+            boolean dlqEnable = (update.getIsDlq() != null && update.getIsDlq() == 1);
+            boolean rlqEnable = (update.getIsRlq() != null && update.getIsRlq() == 1);
+
+            // DLQ is closed, RLQ cannot exist alone and must be closed
+            if (rlqEnable && !dlqEnable) {
+                throw new BusinessException(BizErrorCodeEnum.PULSAR_TOPIC_CREATE_FAILED);
+            }
+
+            // If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
+            // add/remove data streams in the business
+            if (ConsumptionStatus.APPROVED.getStatus() == exists.getStatus()) {
+                String groupId = info.getInlongGroupId();
+                String dlqNameOld = pulsarEntity.getDeadLetterTopic();
+                String dlqNameNew = update.getDeadLetterTopic();
+                if (!dlqEnable) {
+                    pulsarEntity.setIsDlq(0);
+                    pulsarEntity.setDeadLetterTopic(null);
+                    streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
+                } else if (!Objects.equals(dlqNameNew, dlqNameOld)) {
+                    pulsarEntity.setIsDlq(1);
+                    String topic = BizConstant.PREFIX_DLQ + "_" + dlqNameNew;
+                    topic = topic.toLowerCase(Locale.ROOT);
+                    pulsarEntity.setDeadLetterTopic(topic);
+                    streamService.insertDlqOrRlq(groupId, topic, operator);
+                }
+
+                String rlqNameOld = pulsarEntity.getRetryLetterTopic();
+                String rlqNameNew = update.getRetryLetterTopic();
+                if (!rlqEnable) {
+                    pulsarEntity.setIsRlq(0);
+                    pulsarEntity.setRetryLetterTopic(null);
+                    streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
+                } else if (!Objects.equals(rlqNameNew, pulsarEntity.getRetryLetterTopic())) {
+                    pulsarEntity.setIsRlq(1);
+                    String topic = BizConstant.PREFIX_RLQ + "_" + rlqNameNew;
+                    topic = topic.toLowerCase(Locale.ROOT);
+                    pulsarEntity.setRetryLetterTopic(topic);
+                    streamService.insertDlqOrRlq(groupId, topic, operator);
+                }
+            }
+
+            consumptionPulsarMapper.updateByConsumptionId(pulsarEntity);
+        }
 
-        this.consumptionMapper.updateByPrimaryKeySelective(entity);
+        consumptionMapper.updateByPrimaryKeySelective(entity);
+        return true;
+    }
 
-        log.debug("success to update consumption {}", entity);
-        return entity.getId();
+    /**
+     * According to groupId and topic, stitch the full path of Pulsar Topic
+     * "persistent://" + tenant + "/" + namespace + "/" + topic;
+     */
+    private String getFullPulsarTopic(String groupId, String topic) {
+        BusinessEntity businessEntity = businessMapper.selectByIdentifier(groupId);
+        String tenant = clusterBean.getDefaultTenant();
+        String namespace = businessEntity.getMqResourceObj();
+        return "persistent://" + tenant + "/" + namespace + "/" + topic;
     }
 
     @Override
-    public void delete(Integer id, String operator) {
+    @Transactional(rollbackFor = Throwable.class)
+    public Boolean delete(Integer id, String operator) {
         ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(id);
-        Preconditions.checkNotNull(consumptionEntity, () -> "consumption not exist with id:" + id);
-        int success = consumptionMapper.deleteByPrimaryKey(id);
-        Preconditions.checkTrue(success > 0, "delete failed");
+        Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + id);
+        consumptionMapper.deleteByPrimaryKey(id);
+
+        consumptionPulsarMapper.deleteByConsumptionId(id);
+        return true;
     }
 
     @Override
     public WorkflowResult startProcess(Integer id, String operation) {
-        ConsumptionInfo consumptionInfo = this.getInfo(id);
-        Preconditions.checkTrue(ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS
-                        .contains(ConsumptionStatus.fromStatus(consumptionInfo.getStatus())),
+        ConsumptionInfo consumptionInfo = this.get(id);
+        Preconditions.checkTrue(ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS.contains(
+                        ConsumptionStatus.fromStatus(consumptionInfo.getStatus())),
                 "current status not allow start workflow");
 
         ConsumptionEntity updateConsumptionEntity = new ConsumptionEntity();
@@ -190,62 +327,118 @@ public class ConsumptionServiceImpl implements ConsumptionService {
                 genNewConsumptionWorkflowForm(consumptionInfo));
     }
 
+    @Override
+    public void saveSortConsumption(BusinessInfo bizInfo, String topic, String consumerGroup) {
+        String groupId = bizInfo.getInlongGroupId();
+        ConsumptionEntity exists = consumptionMapper.selectConsumptionExists(groupId, topic, consumerGroup);
+        if (exists != null) {
+            log.warn("consumption with groupId={}, topic={}, consumer group={} already exists, skip to create",
+                    groupId, topic, consumerGroup);
+            return;
+        }
+
+        log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
+        String middlewareType = bizInfo.getMiddlewareType();
+        ConsumptionEntity entity = new ConsumptionEntity();
+        entity.setInlongGroupId(groupId);
+        entity.setMiddlewareType(middlewareType);
+        entity.setTopic(topic);
+        entity.setConsumerGroupId(consumerGroup);
+        entity.setInCharges(bizInfo.getInCharges());
+        entity.setFilterEnabled(0);
+
+        entity.setStatus(ConsumptionStatus.APPROVED.getStatus());
+        entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+        entity.setCreator(bizInfo.getCreator());
+        entity.setCreateTime(new Date());
+
+        consumptionMapper.insert(entity);
+
+        if (BizConstant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
+            ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
+            pulsarEntity.setConsumptionId(entity.getId());
+            pulsarEntity.setConsumerGroupId(consumerGroup);
+            pulsarEntity.setInlongGroupId(groupId);
+            pulsarEntity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+            consumptionPulsarMapper.insert(pulsarEntity);
+        }
+
+        log.debug("success save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
+    }
+
     private NewConsumptionWorkflowForm genNewConsumptionWorkflowForm(ConsumptionInfo consumptionInfo) {
         NewConsumptionWorkflowForm form = new NewConsumptionWorkflowForm();
+        Integer id = consumptionInfo.getId();
+        if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(consumptionInfo.getMiddlewareType())) {
+            ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
+            ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
+                    ConsumptionPulsarInfo::new);
+            consumptionInfo.setMqExtInfo(pulsarInfo);
+        }
         form.setConsumptionInfo(consumptionInfo);
         return form;
     }
 
-    private ConsumptionEntity saveConsumption(ConsumptionInfo consumptionInfo, String operator) {
-        Date now = new Date();
-        consumptionInfo.setCreateTime(now);
-        consumptionInfo.setModifyTime(now);
-        consumptionInfo.setModifier(operator);
-        consumptionInfo.setCreator(operator);
-
-        ConsumptionEntity entity = CommonBeanUtils.copyProperties(consumptionInfo, ConsumptionEntity::new);
+    private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator, Date now) {
+        ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
         entity.setStatus(ConsumptionStatus.WAITING_ASSIGN.getStatus());
-        entity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+        entity.setIsDeleted(0);
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+        entity.setCreateTime(now);
+        entity.setModifyTime(now);
 
-        int success;
-        if (consumptionInfo.getId() != null) {
-            success = this.consumptionMapper.updateByPrimaryKey(entity);
+        if (info.getId() != null) {
+            consumptionMapper.updateByPrimaryKey(entity);
         } else {
-            success = this.consumptionMapper.insert(entity);
+            consumptionMapper.insert(entity);
         }
-        Preconditions.checkTrue(success > 0, "save failed");
-        Preconditions.checkNotNull(entity.getId(), "save failed");
+
+        Preconditions.checkNotNull(entity.getId(), "save consumption failed");
         return entity;
     }
 
     /**
-     * Check whether the consumption information is valid
+     * Fill in consumer information
      */
-    private void checkConsumptionInfo(ConsumptionInfo info) {
-        Preconditions.checkNotNull(info, "Consumption info can't be null");
-        Preconditions.checkNotNull(info.getConsumerGroupName(), "Consumer Group Name can't be null");
+    private void fullConsumptionInfo(ConsumptionInfo info) {
+        Preconditions.checkNotNull(info, "consumption info cannot be null");
+        info.setConsumerGroupId(info.getConsumerGroupName());
 
-        // Undeleted consumer group id must be unique
-        String groupId = info.getConsumerGroupName().toLowerCase(Locale.ROOT);
-        info.setConsumerGroupId(groupId);
-        Preconditions.checkTrue(!isConsumerGroupIdExist(groupId, info.getId()),
-                "Consumer Group ID " + groupId + " already exist");
+        Preconditions.checkFalse(isConsumerGroupIdExists(info.getConsumerGroupId(), info.getId()),
+                "consumer group " + info.getConsumerGroupId() + " already exist");
 
         if (info.getId() != null) {
-            ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(info.getId());
-            Preconditions.checkNotNull(entity, "Consumption not exist with id:" + info.getId());
+            ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
+            Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
 
-            ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus(entity.getStatus());
+            ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus(consumptionEntity.getStatus());
             Preconditions.checkTrue(ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumptionStatus),
-                    "Consumption not allowed to update when its status is" + consumptionStatus.name());
+                    "consumption not allow update when status is " + consumptionStatus.name());
         }
 
-        BusinessTopicVO topicVO = businessService.getTopic(info.getInlongGroupId());
-        Preconditions.checkNotNull(topicVO, "Business not exist :" + info.getInlongGroupId());
-        Preconditions.checkTrue(topicVO.getTopicName() != null
-                        && topicVO.getTopicName().equals(info.getTopic()),
-                "Topic [" + info.getTopic() + "] not belong to business " + info.getInlongGroupId());
-
+        // Determine whether the consumed topic belongs to this groupId or the data stream under it
+        Preconditions.checkNotNull(info.getTopic(), "consumption topic cannot be empty");
+
+        String groupId = info.getInlongGroupId();
+        BusinessTopicVO topicVO = businessService.getTopic(groupId);
+        Preconditions.checkNotNull(topicVO, "business not exist: " + groupId);
+
+        // Tube’s topic is the business group level, one business group, one Tube topic
+        if (BizConstant.MIDDLEWARE_TUBE.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+            String bizTopic = topicVO.getMqResourceObj();
+            Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
+                    "topic [" + info.getTopic() + "] not belong to business " + groupId);
+        } else if (BizConstant.MIDDLEWARE_PULSAR.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+            // Pulsar's topic is the data stream level.
+            // There will be multiple data streams under one business, and there will be multiple topics
+            List<DataStreamTopicVO> dsTopicList = topicVO.getDsTopicList();
+            if (dsTopicList != null && dsTopicList.size() > 0) {
+                Set<String> topicSet = new HashSet<>(Arrays.asList(info.getTopic().split(",")));
+                dsTopicList.forEach(ds -> topicSet.remove(ds.getMqResourceObj()));
+                Preconditions.checkEmpty(topicSet, "topic [" + topicSet + "] not belong to business " + groupId);
+            }
+        }
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
index 7754a39..32d91af 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataStreamServiceImpl.java
@@ -149,6 +149,13 @@ public class DataStreamServiceImpl implements DataStreamService {
         return streamInfo;
     }
 
+    @Override
+    public Boolean exist(String groupId, String streamId) {
+        Preconditions.checkNotNull(groupId, BizConstant.GROUP_ID_IS_EMPTY);
+        DataStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, streamId);
+        return streamEntity != null;
+    }
+
     /**
      * Query and set the extended information and data source fields of the data stream
      *
@@ -626,6 +633,41 @@ public class DataStreamServiceImpl implements DataStreamService {
         return true;
     }
 
+    @Override
+    public void insertDlqOrRlq(String groupId, String topicName, String operator) {
+        Integer count = streamMapper.selectExistByIdentifier(groupId, topicName);
+        if (count >= 1) {
+            LOGGER.error("DLQ/RLQ topic already exists with name={}", topicName);
+            throw new BusinessException(BizErrorCodeEnum.DATA_STREAM_ID_DUPLICATE, "DLQ/RLQ topic already exists");
+        }
+
+        DataStreamEntity streamEntity = new DataStreamEntity();
+        streamEntity.setInlongGroupId(groupId);
+        streamEntity.setInlongStreamId(topicName);
+        streamEntity.setMqResourceObj(topicName);
+        streamEntity.setDescription("This is DLQ / RLQ topic created by SYSTEM");
+        streamEntity.setDailyRecords(1000);
+        streamEntity.setDailyStorage(1000);
+        streamEntity.setPeakRecords(1000);
+        streamEntity.setMaxLength(1000);
+
+        streamEntity.setStatus(EntityStatus.DATA_STREAM_CONFIG_SUCCESSFUL.getCode());
+        streamEntity.setIsDeleted(EntityStatus.UN_DELETED.getCode());
+        streamEntity.setCreator(operator);
+        streamEntity.setModifier(operator);
+        Date now = new Date();
+        streamEntity.setCreateTime(now);
+        streamEntity.setModifyTime(now);
+
+        streamMapper.insert(streamEntity);
+    }
+
+    @Override
+    public void logicDeleteDlqOrRlq(String groupId, String topicName, String operator) {
+        streamMapper.logicDeleteDlqOrRlq(groupId, topicName, operator);
+        LOGGER.info("success to logic delete dlq or rlq by groupId={}, topicName={}", groupId, topicName);
+    }
+
     /**
      * Update extended information
      * <p/>First physically delete the existing extended information, and then add this batch of extended information
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
index 16c1058..c5315b6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowDefinition.java
@@ -22,7 +22,7 @@ import com.google.common.collect.Lists;
 import java.util.List;
 import java.util.Optional;
 import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowApproverFilterContext;
 import org.apache.inlong.manager.service.core.BusinessService;
 import org.apache.inlong.manager.service.core.WorkflowApproverService;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
index c3c3612..c9f7d18 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/NewConsumptionWorkflowForm.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Maps;
 import io.swagger.annotations.ApiModelProperty;
 import java.util.Map;
 import lombok.Data;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.service.workflow.BaseWorkflowFormType;
 import org.apache.inlong.manager.workflow.exception.FormValidateException;
 import org.apache.inlong.manager.common.util.Preconditions;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
index 3417151..a9e1cbe 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionApproveTaskListener.java
@@ -56,7 +56,7 @@ public class ConsumptionApproveTaskListener implements TaskEventListener {
             return ListenerResult.success("The consumer group name has not been modified");
         }
         boolean exitDuplicate = this.consumptionService
-                .isConsumerGroupIdExist(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getId());
+                .isConsumerGroupIdExists(approveForm.getConsumerGroupId(), form.getConsumptionInfo().getId());
         if (exitDuplicate) {
             log.error("consumerGroupId already exist! duplicate :{}", approveForm.getConsumerGroupId());
             throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_NAME_DUPLICATED);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
index 605b8c1..389c4e3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/newconsumption/listener/ConsumptionCompleteProcessListener.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.enums.BizConstant;
 import org.apache.inlong.manager.common.enums.BizErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
index 11cd284..4b2e75f 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
@@ -24,11 +24,10 @@ import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import org.apache.inlong.manager.common.beans.Response;
 import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionInfo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionListVo;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionQuery;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionSummary;
-import org.apache.inlong.manager.common.pojo.dataconsumption.ConsumptionUpdateInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionListVo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionQuery;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionSummary;
 import org.apache.inlong.manager.common.util.LoginUserUtil;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.service.core.operationlog.OperationLog;
@@ -72,7 +71,7 @@ public class ConsumptionController {
     @ApiOperation(value = "Get consumption details")
     public Response<ConsumptionInfo> getDetail(
             @ApiParam(value = "Consumption ID", required = true) @PathVariable(name = "id") Integer id) {
-        return Response.success(consumptionService.getInfo(id));
+        return Response.success(consumptionService.get(id));
     }
 
     @DeleteMapping("/delete/{id}")
@@ -96,12 +95,11 @@ public class ConsumptionController {
     @PostMapping("/update/{id}")
     @OperationLog(operation = OperationType.UPDATE)
     @ApiOperation(value = "Update data consumption")
-    public Response<Integer> updateConsumptionInfo(
-            @ApiParam(value = "Consumption ID", required = true) @PathVariable(name = "id") Integer id,
-            @Validated @RequestBody ConsumptionUpdateInfo consumptionUpdateInfo) {
-        consumptionUpdateInfo.setId(id);
-        String currentUser = LoginUserUtil.getLoginUserDetail().getUserName();
-        return Response.success(consumptionService.update(consumptionUpdateInfo, currentUser));
+    public Response<String> update(@PathVariable(name = "id") Integer id,
+            @Validated @RequestBody ConsumptionInfo consumptionInfo) {
+        consumptionInfo.setId(id);
+        consumptionService.update(consumptionInfo, LoginUserUtil.getLoginUserDetail().getUserName());
+        return Response.success();
     }
 
     @PostMapping("/startProcess/{id}")
diff --git a/inlong-manager/manager-web/src/main/resources/application-dev.properties b/inlong-manager/manager-web/src/main/resources/application-dev.properties
index 16a5a30..4ca899c 100644
--- a/inlong-manager/manager-web/src/main/resources/application-dev.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-dev.properties
@@ -22,8 +22,8 @@ logging.level.root=INFO
 logging.level.org.apache.inlong.manager=debug
 
 spring.datasource.druid.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_manager?useSSL=false&allowPublicKeyRetrieval=true&characterEncoding=UTF-8&nullCatalogMeansCurrent=true&serverTimezone=GMT%2b8
-spring.datasource.druid.username=xxxxxx
-spring.datasource.druid.password=xxxxxx
+spring.datasource.druid.username=root
+spring.datasource.druid.password=fighting
 
 spring.datasource.druid.driver-class-name=com.mysql.cj.jdbc.Driver
 spring.datasource.druid.validationQuery=SELECT 'x'
@@ -64,3 +64,5 @@ sort.appName=inlong_app
 pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
 # Pulsar master address
 pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/main/resources/application-prod.properties b/inlong-manager/manager-web/src/main/resources/application-prod.properties
index 5da7518..056e5a6 100644
--- a/inlong-manager/manager-web/src/main/resources/application-prod.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-prod.properties
@@ -63,3 +63,5 @@ sort.appName=inlong_app
 pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
 # Pulsar master address
 pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/main/resources/application-test.properties b/inlong-manager/manager-web/src/main/resources/application-test.properties
index 16a5a30..b4a75c9 100644
--- a/inlong-manager/manager-web/src/main/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/main/resources/application-test.properties
@@ -64,3 +64,5 @@ sort.appName=inlong_app
 pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
 # Pulsar master address
 pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
index f085c61..8936052 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/BusinessServiceTest.java
@@ -24,17 +24,18 @@ import org.apache.inlong.manager.web.ServiceBaseTest;
 import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.TestComponent;
 
 /**
  * Business Service Test
  */
+@TestComponent
 public class BusinessServiceTest extends ServiceBaseTest {
 
     @Autowired
     private BusinessService businessService;
 
-    public String saveBusiness(String operator) {
-        String groupName = "test_group";
+    public String saveBusiness(String groupName, String operator) {
         BusinessInfo businessInfo = new BusinessInfo();
         businessInfo.setName(groupName);
         businessInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
@@ -53,15 +54,17 @@ public class BusinessServiceTest extends ServiceBaseTest {
 
     @Test
     public void testSave() {
+        String groupName = "test_group1";
         String operator = "test_user";
-        String groupId = this.saveBusiness(operator);
+        String groupId = this.saveBusiness(groupName, operator);
         Assert.assertNotNull(groupId);
     }
 
     @Test
     public void testDelete() {
+        String groupName = "test_group2";
         String operator = "test_user";
-        String groupId = this.saveBusiness(operator);
+        String groupId = this.saveBusiness(groupName, operator);
         boolean result = businessService.delete(groupId, operator);
         Assert.assertTrue(result);
     }
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
new file mode 100644
index 0000000..8b74aef
--- /dev/null
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/service/core/ConsumptionServiceTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.core;
+
+import org.apache.inlong.manager.common.enums.BizConstant;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
+import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
+import org.apache.inlong.manager.web.ServiceBaseTest;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+/**
+ * Consumption Service Test
+ */
+public class ConsumptionServiceTest extends ServiceBaseTest {
+
+    @Autowired
+    private ConsumptionService consumptionService;
+    @Autowired
+    private BusinessServiceTest businessServiceTest;
+
+    private Integer saveConsumption(String inlongGroup, String consumerGroup, String operator) {
+        ConsumptionInfo consumptionInfo = new ConsumptionInfo();
+        consumptionInfo.setTopic(inlongGroup);
+        consumptionInfo.setConsumerGroupName(consumerGroup);
+        consumptionInfo.setInlongGroupId("b_" + inlongGroup);
+        consumptionInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
+        consumptionInfo.setCreator(operator);
+
+        ConsumptionPulsarInfo pulsarInfo = new ConsumptionPulsarInfo();
+        pulsarInfo.setMiddlewareType(BizConstant.MIDDLEWARE_PULSAR);
+        pulsarInfo.setIsDlq(1);
+        pulsarInfo.setDeadLetterTopic("test_dlq");
+        pulsarInfo.setIsRlq(0);
+
+        consumptionInfo.setMqExtInfo(pulsarInfo);
+
+        return consumptionService.save(consumptionInfo, operator);
+    }
+
+    @Test
+    public void testSave() {
+        String inlongGroup = "inlong_group1";
+        String consumerGroup = "test_save_consumer_group";
+        String operator = "test_user";
+        businessServiceTest.saveBusiness(inlongGroup, operator);
+        Integer id = this.saveConsumption(inlongGroup, consumerGroup, operator);
+        Assert.assertNotNull(id);
+    }
+
+    @Test
+    public void testDelete() {
+        String inlongGroup = "inlong_group2";
+        String operator = "test_user";
+        String consumerGroup = "test_delete_consumer_group";
+        businessServiceTest.saveBusiness(inlongGroup, operator);
+        Integer id = this.saveConsumption(inlongGroup, consumerGroup, operator);
+        boolean result = consumptionService.delete(id, operator);
+        Assert.assertTrue(result);
+    }
+
+}
diff --git a/inlong-manager/manager-web/src/test/resources/application-test.properties b/inlong-manager/manager-web/src/test/resources/application-test.properties
index 6333bd0..5053a8b 100644
--- a/inlong-manager/manager-web/src/test/resources/application-test.properties
+++ b/inlong-manager/manager-web/src/test/resources/application-test.properties
@@ -67,3 +67,5 @@ sort.appName=inlong_app
 pulsar.adminUrl=http://127.0.0.1:8080,127.0.0.2:8080,127.0.0.3:8080
 # Pulsar master address
 pulsar.serviceUrl=pulsar://127.0.0.1:6650,127.0.0.1:6650,127.0.0.1:6650
+# Default tenant of Pulsar
+pulsar.defaultTenant=public
diff --git a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
index 9b8e124..d3ae3f1 100644
--- a/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/src/test/resources/sql/apache_inlong_manager.sql
@@ -244,6 +244,25 @@ CREATE TABLE `consumption`
 );
 
 -- ----------------------------
+-- Table structure for consumption_pulsar
+-- ----------------------------
+DROP TABLE IF EXISTS `consumption_pulsar`;
+CREATE TABLE `consumption_pulsar`
+(
+    `id`                  int          NOT NULL AUTO_INCREMENT,
+    `consumption_id`      int          DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
+    `consumer_group_id`   varchar(255) NOT NULL COMMENT 'Consumer group ID',
+    `consumer_group_name` varchar(255) NOT NULL COMMENT 'Consumer group name',
+    `inlong_group_id`     varchar(255) NOT NULL COMMENT 'Business group ID',
+    `is_rlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
+    `retry_letter_topic`  varchar(255) DEFAULT NULL COMMENT 'The name of the retry queue topic',
+    `is_dlq`              tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
+    `dead_letter_topic`   varchar(255) DEFAULT NULL COMMENT 'dead letter topic name',
+    `is_deleted`          int          DEFAULT '0' COMMENT 'Whether to delete',
+    PRIMARY KEY (`id`)
+) COMMENT ='Pulsar consumption table';
+
+-- ----------------------------
 -- Table structure for data_proxy_cluster
 -- ----------------------------
 DROP TABLE IF EXISTS `data_proxy_cluster`;
diff --git a/inlong-manager/pom.xml b/inlong-manager/pom.xml
index 686f253..a934118 100644
--- a/inlong-manager/pom.xml
+++ b/inlong-manager/pom.xml
@@ -52,6 +52,7 @@
         <druid.version>1.2.5</druid.version>
         <shiro.version>1.4.1</shiro.version>
         <hive.version>3.1.2</hive.version>
+        <pulsar.version>2.8.1</pulsar.version>
         <servlet-api.version>2.5</servlet-api.version>
         <servlet.jsp-api.version>2.3.3</servlet.jsp-api.version>
         <slf4j.version>1.7.25</slf4j.version>
@@ -195,6 +196,16 @@
                 <artifactId>pagehelper-spring-boot-starter</artifactId>
                 <version>${pagehelper.springboot.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client</artifactId>
+                <version>${pulsar.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.pulsar</groupId>
+                <artifactId>pulsar-client-admin</artifactId>
+                <version>${pulsar.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>com.alibaba</groupId>