You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/03 10:07:34 UTC

[inlong] branch master updated: [INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 473f67b24 [INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542)
473f67b24 is described below

commit 473f67b248b9c8c309878e4e5200fa5c1a645ccc
Author: ciscozhou <45...@users.noreply.github.com>
AuthorDate: Sat Sep 3 18:07:28 2022 +0800

    [INLONG-5302][Manager] Supports the extension of data consumption for different MQs (#5542)
---
 .../{ConsumptionStatus.java => ConsumeStatus.java} |  49 +++--
 .../inlong/manager/common/enums/ErrorCodeEnum.java |   8 +
 .../manager/dao/entity/InlongConsumeEntity.java}   |  34 +++-
 .../dao/mapper/InlongConsumeEntityMapper.java}     |  35 ++--
 .../mappers/InlongConsumeEntityMapper.xml          | 190 +++++++++++++++++++
 .../inlong/manager/pojo/common/PageRequest.java    |   2 +
 .../pojo/consume/InlongConsumeBriefInfo.java       |  73 ++++++++
 .../InlongConsumeCountInfo.java}                   |  23 ++-
 .../manager/pojo/consume/InlongConsumeInfo.java    |  89 +++++++++
 .../pojo/consume/InlongConsumePageRequest.java     |  69 +++++++
 .../manager/pojo/consume/InlongConsumeRequest.java |  72 +++++++
 .../pojo/consume/pulsar/ConsumePulsarDTO.java      |  81 ++++++++
 .../pojo/consume/pulsar/ConsumePulsarInfo.java     |  57 ++++++
 .../pojo/consume/pulsar/ConsumePulsarRequest.java  |  51 +++++
 .../pojo/consume/tubemq/ConsumeTubeMQDTO.java      |  63 +++++++
 .../tubemq/ConsumeTubeMQInfo.java}                 |  26 ++-
 .../tubemq/ConsumeTubeMQRequest.java}              |  20 +-
 .../manager/pojo/group/InlongGroupRequest.java     |   3 -
 .../manager/pojo/group/tubemq/InlongTubeMQDTO.java |   2 +-
 .../service/consume/AbstractConsumeOperator.java   |  86 +++++++++
 .../service/consume/ConsumePulsarOperator.java     | 152 +++++++++++++++
 .../service/consume/ConsumeTubeMQOperator.java     |  91 +++++++++
 .../service/consume/InlongConsumeOperator.java     |  73 ++++++++
 .../consume/InlongConsumeOperatorFactory.java      |  46 +++++
 .../InlongConsumeProcessService.java}              |  25 ++-
 .../service/consume/InlongConsumeService.java      |  96 ++++++++++
 .../service/consume/InlongConsumeServiceImpl.java  | 208 +++++++++++++++++++++
 .../service/core/impl/ConsumptionServiceImpl.java  |  20 +-
 .../service/group/InlongGroupProcessService.java   |   2 +-
 .../service/group/InlongGroupServiceImpl.java      |   7 +-
 .../ConsumptionCancelProcessListener.java          |   4 +-
 .../ConsumptionCompleteProcessListener.java        |   4 +-
 .../ConsumptionRejectProcessListener.java          |   4 +-
 .../main/resources/h2/apache_inlong_manager.sql    |  31 ++-
 .../manager-web/sql/apache_inlong_manager.sql      |  31 ++-
 .../web/controller/ConsumptionController.java      |   4 +-
 .../web/controller/InlongConsumeController.java    | 116 ++++++++++++
 37 files changed, 1849 insertions(+), 98 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
similarity index 62%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
index 05e45fe3b..0b9013df2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumptionStatus.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ConsumeStatus.java
@@ -21,17 +21,18 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
+import org.apache.inlong.manager.common.util.InlongCollectionUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Function;
-import org.apache.inlong.manager.common.util.InlongCollectionUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
 
 /**
- * Data consumption status
+ * Inlong consume status
  */
-@ApiModel("Data consumption status")
-public enum ConsumptionStatus {
+@ApiModel("Inlong consume status")
+public enum ConsumeStatus {
 
     @ApiModelProperty(value = "To be allocated: 10")
     WAIT_ASSIGN(10),
@@ -46,33 +47,41 @@ public enum ConsumptionStatus {
     APPROVED(21),
 
     @ApiModelProperty(value = "Cancel application: 22")
-    CANCELED(22);
+    CANCELED(22),
+
+    @ApiModelProperty(value = "Deleting: 41")
+    DELETING(41),
+
+    @ApiModelProperty(value = "Deleted: 40")
+    DELETED(40),
+
+    ;
 
-    public static final Set<ConsumptionStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet
+    public static final Set<ConsumeStatus> ALLOW_SAVE_UPDATE_STATUS = ImmutableSet
             .of(WAIT_ASSIGN, REJECTED, CANCELED);
 
-    public static final Set<ConsumptionStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN);
+    public static final Set<ConsumeStatus> ALLOW_START_WORKFLOW_STATUS = ImmutableSet.of(WAIT_ASSIGN);
 
-    private static final Map<Integer, ConsumptionStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap(
-            Lists.newArrayList(ConsumptionStatus.values()),
-            ConsumptionStatus::getStatus,
+    private static final Map<Integer, ConsumeStatus> STATUS_MAP = InlongCollectionUtils.transformToImmutableMap(
+            Lists.newArrayList(ConsumeStatus.values()),
+            ConsumeStatus::getCode,
             Function.identity()
     );
 
-    private final int status;
+    private final int code;
 
-    ConsumptionStatus(int status) {
-        this.status = status;
+    ConsumeStatus(int code) {
+        this.code = code;
     }
 
-    public static ConsumptionStatus fromStatus(int status) {
-        ConsumptionStatus consumptionStatus = STATUS_MAP.get(status);
-        Preconditions.checkNotNull(consumptionStatus, "status is unavailable :" + status);
-        return consumptionStatus;
+    public static ConsumeStatus fromStatus(int status) {
+        ConsumeStatus consumeStatus = STATUS_MAP.get(status);
+        Preconditions.checkNotNull(consumeStatus, "consume status is invalid for " + status);
+        return consumeStatus;
     }
 
-    public int getStatus() {
-        return status;
+    public int getCode() {
+        return code;
     }
 
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index d6cc20776..87a6e706c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -125,6 +125,14 @@ public enum ErrorCodeEnum {
     PULSAR_DLQ_RLQ_ERROR(2606, "Wrong config for the RLQ and DLQ: RLQ was enabled, but the DLQ was disabled"),
     PULSAR_DLQ_DUPLICATED(2607, "DLQ topic already exists under the inlong group"),
     PULSAR_RLQ_DUPLICATED(2608, "RLQ topic already exists under the inlong group"),
+    CONSUMER_INFO_INCORRECT(2609, "Consumer info was incorrect"),
+    CONSUMER_NOR_FOUND(2609, "Consumer not found"),
+
+    CONSUME_NOT_FOUND(3001, "Inlong consume does not exist/no operation authority"),
+    CONSUME_DUPLICATE(3002, "Inlong consume already exists"),
+    CONSUME_INFO_INCORRECT(3003, "Inlong consume info was incorrect"),
+    CONSUME_SAVE_FAILED(3004, "Failed to save/update inlong consume"),
+    CONSUME_PERMISSION_DENIED(3005, "No permission to access this inlong consume"),
 
     ;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java
similarity index 52%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java
index 4b7bdff63..55b1fc645 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/InlongConsumeEntity.java
@@ -15,20 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.dao.entity;
 
-import io.swagger.annotations.ApiModel;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+import java.util.Date;
 
 /**
- * Inlong group info for TubeMQ
+ * Inlong consume entity.
  */
 @Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+public class InlongConsumeEntity implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private Integer id;
+    private String consumerGroup;
+    private String description;
+    private String mqType;
+    private String topic;
+
+    private String inlongGroupId;
+    private Integer filterEnabled;
+    private String inlongStreamId;
+    private String extParams;
 
-    // no field
+    private String inCharges;
+    private Integer status;
+    private Integer isDeleted;
+    private String creator;
+    private String modifier;
+    private Date createTime;
+    private Date modifyTime;
+    private Integer version;
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
similarity index 50%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 4b7bdff63..3b0205048 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -15,20 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.dao.mapper;
 
-import io.swagger.annotations.ApiModel;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import org.apache.ibatis.annotations.Param;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.springframework.stereotype.Repository;
 
-/**
- * Inlong group info for TubeMQ
- */
-@Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+import java.util.List;
+import java.util.Map;
+
+@Repository
+public interface InlongConsumeEntityMapper {
+
+    int insert(InlongConsumeEntity record);
+
+    InlongConsumeEntity selectById(Integer id);
+
+    List<Map<String, Object>> countByUser(@Param(value = "username") String username);
+
+    List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
+
+    int updateById(InlongConsumeEntity record);
+
+    int updateByIdSelective(InlongConsumeEntity record);
 
-    // no field
+    int deleteById(Integer id);
 
 }
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
new file mode 100644
index 000000000..1db846b23
--- /dev/null
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -0,0 +1,190 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements. See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership. The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper namespace="org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper">
+    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+        <id column="id" jdbcType="INTEGER" property="id"/>
+        <result column="consumer_group" jdbcType="VARCHAR" property="consumerGroup"/>
+        <result column="description" jdbcType="VARCHAR" property="description"/>
+        <result column="mq_type" jdbcType="VARCHAR" property="mqType"/>
+        <result column="topic" jdbcType="VARCHAR" property="topic"/>
+        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
+        <result column="filter_enabled" jdbcType="INTEGER" property="filterEnabled"/>
+        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
+        <result column="ext_params" jdbcType="LONGVARCHAR" property="extParams"/>
+        <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
+        <result column="status" jdbcType="INTEGER" property="status"/>
+        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
+        <result column="creator" jdbcType="VARCHAR" property="creator"/>
+        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
+        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
+        <result column="version" jdbcType="INTEGER" property="version"/>
+    </resultMap>
+
+    <sql id="Base_Column_List">
+        id, consumer_group, description, mq_type, topic, inlong_group_id, filter_enabled, inlong_stream_id,
+        ext_params, in_charges, status, is_deleted, creator, modifier, create_time, modify_time, version
+    </sql>
+
+    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
+            parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+        insert into inlong_consume (id, consumer_group, description,
+                                    mq_type, topic, inlong_group_id,
+                                    filter_enabled, inlong_stream_id,
+                                    ext_params, in_charges, status,
+                                    is_deleted, creator, modifier)
+        values (#{id, jdbcType=INTEGER}, #{consumerGroup, jdbcType=VARCHAR}, #{description, jdbcType=VARCHAR},
+                #{mqType, jdbcType=VARCHAR}, #{topic, jdbcType=VARCHAR}, #{inlongGroupId, jdbcType=VARCHAR},
+                #{filterEnabled, jdbcType=INTEGER}, #{inlongStreamId, jdbcType=VARCHAR},
+                #{extParams, jdbcType=LONGVARCHAR}, #{inCharges, jdbcType=VARCHAR}, #{status, jdbcType=INTEGER},
+                #{isDeleted, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
+    </insert>
+
+    <select id="selectById" parameterType="java.lang.Integer" resultMap="BaseResultMap">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_consume
+        where id = #{id, jdbcType=INTEGER}
+    </select>
+    <select id="countByUser" resultType="java.util.Map">
+        select status, count(1) as total
+        from inlong_consume
+        where is_deleted = 0
+          and (creator = #{username, jdbcType=VARCHAR} or FIND_IN_SET(#{username, jdbcType=VARCHAR}, in_charges))
+        group by status
+    </select>
+    <select id="selectByCondition" resultMap="BaseResultMap"
+            parameterType="org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_consume
+        <where>
+            is_deleted = 0
+            <if test="isAdminRole == false">
+                and (
+                creator = #{currentUser, jdbcType=VARCHAR} or FIND_IN_SET(#{currentUser, jdbcType=VARCHAR}, in_charges)
+                )
+            </if>
+            <if test="keyword != null and keyword !=''">
+                and (consumer_group like CONCAT('%', #{keyword}, '%') or topic like CONCAT('%', #{keyword}, '%'))
+            </if>
+            <if test="consumerGroup != null and consumerGroup != ''">
+                and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+            </if>
+            <if test="mqType != null and mqType != ''">
+                and mq_type = #{mqType, jdbcType=VARCHAR}
+            </if>
+            <if test="topic != null and topic != ''">
+                and topic = #{topic}
+            </if>
+            <if test="inlongGroupId != null and inlongGroupId != ''">
+                and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+            </if>
+            <if test="status != null">
+                and status = #{status, jdbcType=INTEGER}
+            </if>
+            <if test="statusList != null and statusList.size() > 0">
+                and status in
+                <foreach collection="statusList" item="status" index="index" open="(" close=")" separator=",">
+                    #{status}
+                </foreach>
+            </if>
+        </where>
+        <choose>
+            <when test="orderField != null and orderField != '' and orderType != null and orderType != ''">
+                order by ${orderField} ${orderType}
+            </when>
+            <otherwise>
+                order by create_time desc
+            </otherwise>
+        </choose>
+    </select>
+
+    <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+        update inlong_consume
+        set consumer_group   = #{consumerGroup, jdbcType=VARCHAR},
+            in_charges       = #{inCharges, jdbcType=VARCHAR},
+            inlong_group_id  = #{inlongGroupId, jdbcType=VARCHAR},
+            mq_type          = #{mqType, jdbcType=VARCHAR},
+            topic            = #{topic, jdbcType=VARCHAR},
+            filter_enabled   = #{filterEnabled, jdbcType=INTEGER},
+            inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
+            ext_params       = #{extParams, jdbcType = LONGVARCHAR},
+            status           = #{status, jdbcType=INTEGER},
+            modifier         = #{modifier, jdbcType=VARCHAR},
+            is_deleted       = #{isDeleted, jdbcType=INTEGER},
+            version          = #{version, jdbcType=INTEGER} + 1
+        where id = #{id, jdbcType=INTEGER}
+          and is_deleted = 0
+          and version = #{version, jdbcType=INTEGER}
+    </update>
+
+    <update id="updateByIdSelective"
+            parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+        update inlong_consume
+        <set>
+            <if test="consumerGroup != null">
+                consumer_group = #{consumerGroup, jdbcType=VARCHAR},
+            </if>
+            <if test="description != null">
+                description = #{description,jdbcType=VARCHAR},
+            </if>
+            <if test="mqType != null">
+                mq_type = #{mqType, jdbcType=VARCHAR},
+            </if>
+            <if test="topic != null">
+                topic = #{topic, jdbcType=VARCHAR},
+            </if>
+            <if test="inlongGroupId != null">
+                inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
+            </if>
+            <if test="filterEnabled != null">
+                filter_enabled = #{filterEnabled, jdbcType=INTEGER},
+            </if>
+            <if test="inlongStreamId != null">
+                inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
+            </if>
+            <if test="extParams != null">
+                ext_params = #{extParams, jdbcType=LONGVARCHAR},
+            </if>
+            <if test="inCharges != null">
+                in_charges = #{inCharges, jdbcType=VARCHAR},
+            </if>
+            <if test="status != null">
+                status = #{status, jdbcType=INTEGER},
+            </if>
+            <if test="modifier != null">
+                modifier = #{modifier, jdbcType=VARCHAR},
+            </if>
+            version = #{version, jdbcType=INTEGER} + 1
+        </set>
+        where id = #{id, jdbcType=INTEGER}
+        and is_deleted = 0
+        and version = #{version, jdbcType=INTEGER}
+    </update>
+
+    <delete id="deleteById" parameterType="java.lang.Integer">
+        delete
+        from inlong_consume
+        where id = #{id,jdbcType=INTEGER}
+    </delete>
+</mapper>
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java
index 8c1a105c8..fb9af69f4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/common/PageRequest.java
@@ -26,6 +26,8 @@ import io.swagger.annotations.ApiModelProperty;
 @ApiModel(value = "Pagination request")
 public class PageRequest {
 
+    public static final Integer MAX_PAGE_SIZE = 100;
+
     @ApiModelProperty(value = "Current page number, default is 1")
     private int pageNum = 1;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java
new file mode 100644
index 000000000..70ae5f26c
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeBriefInfo.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Date;
+
+/**
+ * Inlong consume brief info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong consume brief info")
+public class InlongConsumeBriefInfo {
+
+    @ApiModelProperty(value = "Primary key")
+    private Integer id;
+
+    @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]")
+    private String consumerGroup;
+
+    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+    private String mqType;
+
+    @ApiModelProperty(value = "The target topic of inlong consume")
+    private String topic;
+
+    @ApiModelProperty(value = "The target inlongGroupId of inlong consume")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "Name of responsible person, separated by commas")
+    private String inCharges;
+
+    @ApiModelProperty(value = "Consume status")
+    private Integer status;
+
+    @ApiModelProperty(value = "Name of creator")
+    private String creator;
+
+    @ApiModelProperty(value = "Name of modifier")
+    private String modifier;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date createTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java
similarity index 58%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java
index 4b7bdff63..e48d97e0d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeCountInfo.java
@@ -15,20 +15,29 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.pojo.consume;
 
 import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import lombok.NoArgsConstructor;
 
 /**
- * Inlong group info for TubeMQ
+ * Count info of inlong consume status.
  */
 @Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+@ApiModel("Count info of inlong consume status")
+public class InlongConsumeCountInfo {
 
-    // no field
+    @ApiModelProperty(value = "Total consume number")
+    private long totalCount;
+
+    @ApiModelProperty(value = "Total number of to be allocated (the number of configuring consumes)")
+    private long waitAssignCount;
+
+    @ApiModelProperty(value = "Total number of to be approved")
+    private long waitApproveCount;
+
+    @ApiModelProperty(value = "Total number of rejections")
+    private long rejectCount;
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
new file mode 100644
index 000000000..a1ecf3620
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeInfo.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.experimental.SuperBuilder;
+
+import java.util.Date;
+
+/**
+ * Base inlong consume info
+ */
+@Data
+@SuperBuilder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Base inlong consume info")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
+public abstract class InlongConsumeInfo {
+
+    @ApiModelProperty(value = "Primary key")
+    private Integer id;
+
+    @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]")
+    private String consumerGroup;
+
+    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+    private String mqType;
+
+    @ApiModelProperty(value = "The target topic of this consume")
+    private String topic;
+
+    @ApiModelProperty(value = "The target inlong group id of this consume")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "Whether to filter consumption, 0-not filter, 1-filter")
+    private Integer filterEnabled = 0;
+
+    @ApiModelProperty(value = "The target inlong stream id of this consume, needed if the filterEnabled=1")
+    private String inlongStreamId;
+
+    @ApiModelProperty(value = "Cluster URL of message queue")
+    private String clusterUrl;
+
+    @ApiModelProperty(value = "Name of responsible person, separated by commas")
+    private String inCharges;
+
+    @ApiModelProperty(value = "Consume status")
+    private Integer status;
+
+    @ApiModelProperty(value = "Name of creator")
+    private String creator;
+
+    @ApiModelProperty(value = "Name of modifier")
+    private String modifier;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date createTime;
+
+    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+    private Date modifyTime;
+
+    @ApiModelProperty(value = "Version number")
+    private Integer version;
+
+    public abstract InlongConsumeRequest genRequest();
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java
new file mode 100644
index 000000000..a8f73552e
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumePageRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.pojo.common.PageRequest;
+
+import java.util.List;
+
+/**
+ * Inlong consume query request
+ */
+@Data
+@EqualsAndHashCode(callSuper = false)
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong consume query request")
+public class InlongConsumePageRequest extends PageRequest {
+
+    @ApiModelProperty(value = "Keyword, can be consumer group")
+    private String keyword;
+
+    @ApiModelProperty(value = "Consumer group name")
+    private String consumerGroup;
+
+    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+    private String mqType;
+
+    @ApiModelProperty(value = "The target topic of inlong consume")
+    private String topic;
+
+    @ApiModelProperty(value = "The target inlongGroupId of inlong consume")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "Consume status")
+    private Integer status;
+
+    @ApiModelProperty(value = "Consume status list")
+    private List<Integer> statusList;
+
+    @ApiModelProperty(value = "Current user", hidden = true)
+    private String currentUser;
+
+    @ApiModelProperty(value = "Whether the current user is in the administrator role", hidden = true)
+    private Boolean isAdminRole;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
new file mode 100644
index 000000000..5dfa0bcc1
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/InlongConsumeRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Base inlong consume request
+ */
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Base inlong consume request")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType")
+public abstract class InlongConsumeRequest {
+
+    @NotNull(groups = UpdateValidation.class)
+    @ApiModelProperty(value = "Primary key")
+    private Integer id;
+
+    @NotBlank(message = "consumerGroup cannot be null")
+    @ApiModelProperty(value = "Consumer group, only support [a-zA-Z0-9_]")
+    private String consumerGroup;
+
+    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
+    private String mqType;
+
+    @ApiModelProperty(value = "The target topic of this consume")
+    private String topic;
+
+    @NotBlank(message = "inlong group id cannot be null")
+    @ApiModelProperty(value = "The target inlong group id of this consume")
+    private String inlongGroupId;
+
+    @ApiModelProperty(value = "Whether to filter consumption, 0-not filter, 1-filter")
+    private Integer filterEnabled = 0;
+
+    @ApiModelProperty(value = "The target inlong stream id of this consume, needed if the filterEnabled=1")
+    private String inlongStreamId;
+
+    @NotBlank(message = "inCharges cannot be null")
+    @ApiModelProperty(value = "Name of responsible person, separated by commas")
+    private String inCharges;
+
+    @ApiModelProperty(value = "Version number")
+    private Integer version;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
new file mode 100644
index 000000000..5e6388922
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarDTO.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.pulsar;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong group dto of Pulsar
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Inlong group dto of Pulsar")
+public class ConsumePulsarDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+    @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure")
+    private Integer isDlq;
+
+    @ApiModelProperty("The name of the dead letter queue Topic")
+    private String deadLetterTopic;
+
+    @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure")
+    private Integer isRlq;
+
+    @ApiModelProperty("The name of the retry letter queue topic")
+    private String retryLetterTopic;
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static ConsumePulsarDTO getFromRequest(ConsumePulsarRequest request) {
+        return ConsumePulsarDTO.builder()
+                .isDlq(request.getIsDlq())
+                .deadLetterTopic(request.getDeadLetterTopic())
+                .isRlq(request.getIsRlq())
+                .retryLetterTopic(request.getRetryLetterTopic())
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static ConsumePulsarDTO getFromJson(@NotNull String extParams) {
+        try {
+            return OBJECT_MAPPER.readValue(extParams, ConsumePulsarDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java
new file mode 100644
index 000000000..9a1f07324
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarInfo.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.pulsar;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+
+/**
+ * Inlong consume info of Pulsar
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.PULSAR)
+@ApiModel("Inlong consume info of Pulsar")
+public class ConsumePulsarInfo extends InlongConsumeInfo {
+
+    @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure")
+    private Integer isDlq;
+
+    @ApiModelProperty("The name of the dead letter queue Topic")
+    private String deadLetterTopic;
+
+    @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure")
+    private Integer isRlq;
+
+    @ApiModelProperty("The name of the retry letter queue topic")
+    private String retryLetterTopic;
+
+    @Override
+    public ConsumePulsarRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, ConsumePulsarRequest::new);
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java
new file mode 100644
index 000000000..f125fd461
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/pulsar/ConsumePulsarRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.pulsar;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+
+/**
+ * Inlong consume request of Pulsar
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Inlong consume request of Pulsar")
+@JsonTypeDefine(value = MQType.PULSAR)
+public class ConsumePulsarRequest extends InlongConsumeRequest {
+
+    @ApiModelProperty("Whether to configure the dead letter queue, 0: not configure, 1: configure")
+    private Integer isDlq;
+
+    @ApiModelProperty("The name of the dead letter queue Topic")
+    private String deadLetterTopic;
+
+    @ApiModelProperty("Whether to configure the retry letter queue, 0: not configure, 1: configure")
+    private Integer isRlq;
+
+    @ApiModelProperty("The name of the retry letter queue topic")
+    private String retryLetterTopic;
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
new file mode 100644
index 000000000..6b0b313d5
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQDTO.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.consume.tubemq;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong group info of TubeMQ
+ */
+@Data
+@Builder
+@AllArgsConstructor
+@ApiModel("Inlong group info of TubeMQ")
+public class ConsumeTubeMQDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+    // no fields
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static ConsumeTubeMQDTO getFromRequest(ConsumeTubeMQRequest request) {
+        return ConsumeTubeMQDTO.builder().build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static ConsumeTubeMQDTO getFromJson(@NotNull String extParams) {
+        try {
+            return OBJECT_MAPPER.readValue(extParams, ConsumeTubeMQDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CONSUMER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java
similarity index 53%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java
index 4b7bdff63..3c2d4ec52 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQInfo.java
@@ -15,20 +15,32 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.pojo.consume.tubemq;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 
 /**
- * Inlong group info for TubeMQ
+ * Inlong consume info of TubeMQ
  */
 @Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.TUBEMQ)
+@ApiModel("Inlong consume info of TubeMQ")
+public class ConsumeTubeMQInfo extends InlongConsumeInfo {
 
-    // no field
+    // no fields
+
+    @Override
+    public ConsumeTubeMQRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, ConsumeTubeMQRequest::new);
+    }
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java
similarity index 60%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java
index 4b7bdff63..5e051a593 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consume/tubemq/ConsumeTubeMQRequest.java
@@ -15,20 +15,26 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.pojo.group.tubemq;
+package org.apache.inlong.manager.pojo.consume.tubemq;
 
 import io.swagger.annotations.ApiModel;
 import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
 
 /**
- * Inlong group info for TubeMQ
+ * Inlong consume request of TubeMQ
  */
 @Data
-@NoArgsConstructor
-@ApiModel("Inlong group info for TubeMQ")
-public class InlongTubeMQDTO {
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.TUBEMQ)
+@ApiModel("Inlong consume request of TubeMQ")
+public class ConsumeTubeMQRequest extends InlongConsumeRequest {
 
-    // no field
+    // no fields
 
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
index 3c07a9c79..b01595500 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupRequest.java
@@ -98,9 +98,6 @@ public abstract class InlongGroupRequest {
     @ApiModelProperty(value = "Name of followers, separated by commas")
     private String followers;
 
-    @ApiModelProperty(value = "Name of creator")
-    private String creator;
-
     @ApiModelProperty(value = "Inlong group Extension properties")
     private List<InlongGroupExtInfo> extList;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
index 4b7bdff63..01f9ad348 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/tubemq/InlongTubeMQDTO.java
@@ -29,6 +29,6 @@ import lombok.NoArgsConstructor;
 @ApiModel("Inlong group info for TubeMQ")
 public class InlongTubeMQDTO {
 
-    // no field
+    // no fields
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
new file mode 100644
index 000000000..e9f886909
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/AbstractConsumeOperator.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Default operator of inlong consume.
+ */
+public abstract class AbstractConsumeOperator implements InlongConsumeOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractConsumeOperator.class);
+
+    @Autowired
+    private InlongConsumeEntityMapper consumeMapper;
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class)
+    public Integer saveOpt(InlongConsumeRequest request, String operator) {
+        // firstly check the topic info
+        this.checkTopicInfo(request);
+
+        // set the ext params, init status, and other info
+        InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new);
+        this.setTargetEntity(request, entity);
+        entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+
+        consumeMapper.insert(entity);
+        return entity.getId();
+    }
+
+    /**
+     * Set the parameters of the target entity.
+     *
+     * @param request inlong consume request
+     * @param targetEntity targetEntity which will set the new parameters
+     */
+    protected abstract void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity);
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+    public void updateOpt(InlongConsumeRequest request, String operator) {
+        // get the entity from request
+        InlongConsumeEntity entity = CommonBeanUtils.copyProperties(request, InlongConsumeEntity::new);
+        // set the ext params
+        this.setTargetEntity(request, entity);
+        entity.setModifier(operator);
+
+        int rowCount = consumeMapper.updateByIdSelective(entity);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("inlong consume has already updated with id={}, expire version={}",
+                    request.getId(), request.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
new file mode 100644
index 000000000..20d697fe0
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Inlong consume operator for Pulsar.
+ */
+@Service
+public class ConsumePulsarOperator extends AbstractConsumeOperator {
+
+    private static final int DLQ_RLQ_ENABLE = 1;
+    private static final int DLQ__RLQ_DISABLE = 0;
+    // Topic prefix for the dead letter queue
+    private static final String PREFIX_DLQ = "dlq";
+    // Topic prefix for the retry letter queue
+    private static final String PREFIX_RLQ = "rlq";
+
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private InlongStreamEntityMapper streamMapper;
+    @Autowired
+    private InlongClusterService clusterService;
+    @Autowired
+    private InlongStreamService streamService;
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String mqType) {
+        return getMQType().equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType);
+    }
+
+    @Override
+    public String getMQType() {
+        return MQType.PULSAR;
+    }
+
+    @Override
+    public void checkTopicInfo(InlongConsumeRequest request) {
+        // one inlong stream only has one Pulsar topic,
+        // one inlong group may have multiple Pulsar topics.
+        String groupId = request.getInlongGroupId();
+        String originTopic = request.getTopic();
+        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, originTopic);
+        Preconditions.checkNotNull(streamEntity, "not found any Pulsar topic for inlong group " + groupId);
+
+        // format the topic to 'tenant/namespace/topic'
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
+                groupEntity.getInlongClusterTag(), null, ClusterType.PULSAR);
+        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
+                ? InlongConstants.DEFAULT_PULSAR_TENANT
+                : pulsarCluster.getTenant();
+
+        request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
+                groupEntity.getMqResource(), originTopic));
+    }
+
+    @Override
+    public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) {
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
+
+        ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo();
+        CommonBeanUtils.copyProperties(entity, consumeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            ConsumePulsarDTO dto = ConsumePulsarDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, consumeInfo);
+        }
+
+        return consumeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) {
+        // prerequisite for RLQ to be turned on: DLQ must be turned on.
+        // it means, if DLQ is closed, RLQ cannot exist alone and must be closed.
+        ConsumePulsarRequest pulsarRequest = (ConsumePulsarRequest) request;
+        boolean dlqEnable = (DLQ_RLQ_ENABLE == pulsarRequest.getIsDlq());
+        boolean rlqEnable = (DLQ_RLQ_ENABLE == pulsarRequest.getIsRlq());
+        if (rlqEnable && !dlqEnable) {
+            throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
+        }
+
+        // when saving, the DLQ / RLQ under the same groupId cannot be repeated.
+        // when updating, delete the related DLQ / RLQ info.
+        String groupId = targetEntity.getInlongGroupId();
+        if (dlqEnable) {
+            String dlqTopic = PREFIX_DLQ + "_" + pulsarRequest.getDeadLetterTopic();
+            Preconditions.checkTrue(!streamService.exist(groupId, dlqTopic),
+                    ErrorCodeEnum.PULSAR_DLQ_DUPLICATED.getMessage());
+        } else {
+            pulsarRequest.setIsDlq(DLQ__RLQ_DISABLE);
+            pulsarRequest.setDeadLetterTopic(null);
+        }
+        if (rlqEnable) {
+            String rlqTopic = PREFIX_RLQ + "_" + pulsarRequest.getRetryLetterTopic();
+            Preconditions.checkTrue(!streamService.exist(groupId, rlqTopic),
+                    ErrorCodeEnum.PULSAR_RLQ_DUPLICATED.getMessage());
+        } else {
+            pulsarRequest.setIsRlq(DLQ__RLQ_DISABLE);
+            pulsarRequest.setRetryLetterTopic(null);
+        }
+
+        try {
+            targetEntity.setExtParams(objectMapper.writeValueAsString(ConsumePulsarDTO.getFromRequest(pulsarRequest)));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CONSUME_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
new file mode 100644
index 000000000..d507c2d2c
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumeTubeMQOperator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
+import org.apache.inlong.manager.pojo.consume.tubemq.ConsumeTubeMQDTO;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Objects;
+
+/**
+ * Inlong consume operator for TubeMQ.
+ */
+@Service
+public class ConsumeTubeMQOperator extends AbstractConsumeOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumeTubeMQOperator.class);
+
+    @Autowired
+    private InlongGroupService groupService;
+
+    @Override
+    public Boolean accept(String mqType) {
+        return getMQType().equals(mqType);
+    }
+
+    @Override
+    public String getMQType() {
+        return MQType.TUBEMQ;
+    }
+
+    @Override
+    public void checkTopicInfo(InlongConsumeRequest request) {
+        String groupId = request.getInlongGroupId();
+        InlongGroupTopicInfo topicInfo = groupService.getTopic(groupId);
+        Preconditions.checkNotNull(topicInfo, "inlong group not exist: " + groupId);
+
+        // one inlong group only has one TubeMQ topic
+        String mqResource = topicInfo.getMqResource();
+        Preconditions.checkTrue(Objects.equals(mqResource, request.getTopic()),
+                String.format("inlong consume topic %s not belongs to inlong group %s", request.getTopic(), groupId));
+    }
+
+    @Override
+    public InlongConsumeInfo getFromEntity(InlongConsumeEntity entity) {
+        Preconditions.checkNotNull(entity, ErrorCodeEnum.CONSUME_NOT_FOUND.getMessage());
+
+        ConsumePulsarInfo consumeInfo = new ConsumePulsarInfo();
+        CommonBeanUtils.copyProperties(entity, consumeInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            ConsumeTubeMQDTO dto = ConsumeTubeMQDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, consumeInfo);
+        }
+
+        return consumeInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(InlongConsumeRequest request, InlongConsumeEntity targetEntity) {
+        LOGGER.info("do nothing for inlong consume with TubeMQ");
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java
new file mode 100644
index 000000000..52b51c892
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperator.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+
+/**
+ * Interface of the inlong consume operator.
+ */
+public interface InlongConsumeOperator {
+
+    /**
+     * Determines whether the current instance matches the specified type.
+     */
+    Boolean accept(String mqType);
+
+    /**
+     * Get the MQ type.
+     *
+     * @return MQ type string
+     */
+    String getMQType();
+
+    /**
+     * Check whether the topic in inlong consume belongs to its inlong group id.
+     *
+     * @param request inlong consume request
+     */
+    void checkTopicInfo(InlongConsumeRequest request);
+
+    /**
+     * Save the inlong consume info.
+     *
+     * @param request request of the group
+     * @param operator name of the operator
+     * @return inlong consume id after saving
+     */
+    Integer saveOpt(InlongConsumeRequest request, String operator);
+
+    /**
+     * Get the inlong consume info from the given entity.
+     *
+     * @param entity get field value from the entity
+     * @return inlong consume info after encapsulating
+     */
+    InlongConsumeInfo getFromEntity(InlongConsumeEntity entity);
+
+    /**
+     * Update the inlong consume info.
+     *
+     * @param request request of update
+     * @param operator name of operator
+     */
+    void updateOpt(InlongConsumeRequest request, String operator);
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
new file mode 100644
index 000000000..1a4a57482
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link InlongConsumeOperator}.
+ */
+@Service
+public class InlongConsumeOperatorFactory {
+
+    @Autowired
+    private List<InlongConsumeOperator> consumeOperatorList;
+
+    /**
+     * Get a consumption operator instance via the given mqType
+     */
+    public InlongConsumeOperator getInstance(String mqType) {
+        return consumeOperatorList.stream()
+                .filter(inst -> inst.accept(mqType))
+                .findFirst()
+                .orElseThrow(() -> new
+                        BusinessException(String.format(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage(), mqType)));
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
similarity index 82%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
index 0e413701f..d4be887e4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeProcessService.java
@@ -15,11 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core.impl;
+package org.apache.inlong.manager.service.consume;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ProcessName;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -34,9 +34,12 @@ import org.apache.inlong.manager.service.workflow.WorkflowService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+/**
+ * Operation to the inlong consume process.
+ */
 @Slf4j
 @Service
-public class ConsumptionProcessService {
+public class InlongConsumeProcessService {
 
     @Autowired
     private ConsumptionService consumptionService;
@@ -45,13 +48,20 @@ public class ConsumptionProcessService {
     @Autowired
     private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
 
+    /**
+     * Start the process for the specified ID.
+     *
+     * @param id inlong consume id
+     * @param operator name of operator
+     * @return workflow result
+     */
     public WorkflowResult startProcess(Integer id, String operator) {
         ConsumptionInfo consumptionInfo = consumptionService.get(id);
-        Preconditions.checkTrue(ConsumptionStatus.ALLOW_START_WORKFLOW_STATUS.contains(
-                        ConsumptionStatus.fromStatus(consumptionInfo.getStatus())),
-                "current status not allow start workflow");
+        Preconditions.checkTrue(ConsumeStatus.ALLOW_START_WORKFLOW_STATUS.contains(
+                        ConsumeStatus.fromStatus(consumptionInfo.getStatus())),
+                "current status not allowed to start workflow");
 
-        consumptionInfo.setStatus(ConsumptionStatus.WAIT_APPROVE.getStatus());
+        consumptionInfo.setStatus(ConsumeStatus.WAIT_APPROVE.getCode());
         boolean rowCount = consumptionService.update(consumptionInfo, operator);
         Preconditions.checkTrue(rowCount, "update consumption failed");
 
@@ -72,4 +82,5 @@ public class ConsumptionProcessService {
         form.setConsumptionInfo(consumptionInfo);
         return form;
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
new file mode 100644
index 000000000..ce421e337
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+
+import javax.validation.Valid;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong consume service layer interface
+ */
+public interface InlongConsumeService {
+
+    /**
+     * Save inlong consume info.
+     *
+     * @param request consume request need to save
+     * @param operator name of operator
+     * @return inlong consume id after saving
+     */
+    Integer save(InlongConsumeRequest request, String operator);
+
+    /**
+     * Get inlong consume info based on ID
+     *
+     * @param id inlong consume id
+     * @return detail of inlong group
+     */
+    InlongConsumeInfo get(Integer id);
+
+    /**
+     * Check whether the consumer group exists or not
+     *
+     * @param consumerGroup consumer group
+     * @param excludeSelfId exclude the ID of this record
+     * @return true if exists, false if not exists
+     */
+    boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId);
+
+    /**
+     * Paging query inlong consume info list
+     *
+     * @param request pagination query request
+     * @return inlong consume list
+     */
+    PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request);
+
+    /**
+     * Query the inlong consume statistics info via the username
+     *
+     * @param username username
+     * @return inlong consume status statistics
+     */
+    InlongConsumeCountInfo countStatus(String username);
+
+    /**
+     * Update the inlong consume
+     *
+     * @param request inlong consume request that needs to be updated
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean update(@Valid @NotNull(message = "inlong consume request cannot be null") InlongConsumeRequest request,
+            String operator);
+
+    /**
+     * Delete the inlong consume by the id
+     *
+     * @param id inlong consume id that needs to be deleted
+     * @param operator name of operator
+     * @return whether succeed
+     */
+    Boolean delete(Integer id, String operator);
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
new file mode 100644
index 000000000..c7977b500
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.consume;
+
+import com.github.pagehelper.Page;
+import com.github.pagehelper.PageHelper;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
+import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
+import org.apache.inlong.manager.pojo.common.OrderFieldEnum;
+import org.apache.inlong.manager.pojo.common.OrderTypeEnum;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
+
+/**
+ * Inlong consume service layer implementation
+ */
+@Service
+public class InlongConsumeServiceImpl implements InlongConsumeService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class);
+
+    @Autowired
+    private InlongConsumeEntityMapper consumeMapper;
+    @Autowired
+    private InlongConsumeOperatorFactory consumeOperatorFactory;
+
+    @Override
+    public Integer save(InlongConsumeRequest request, String operator) {
+        LOGGER.debug("begin to save inlong consume={} by user={}", request, operator);
+        Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+        Preconditions.checkNotNull(request.getTopic(), "inlong consume topic cannot be null");
+        String consumerGroup = request.getConsumerGroup();
+        Preconditions.checkNotNull(consumerGroup, "inlong consume topic cannot be null");
+        if (consumerGroupExists(consumerGroup, request.getId())) {
+            throw new BusinessException(String.format("consumer group %s already exist", consumerGroup));
+        }
+
+        InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType());
+        consumeOperator.saveOpt(request, operator);
+
+        LOGGER.info("success to save inlong consume for consumer group={} by user={}", consumerGroup, operator);
+        return request.getId();
+    }
+
+    @Override
+    public boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId) {
+        InlongConsumePageRequest request = InlongConsumePageRequest.builder()
+                .consumerGroup(consumerGroup)
+                .isAdminRole(true)
+                .build();
+        List<InlongConsumeEntity> result = consumeMapper.selectByCondition(request);
+        if (excludeSelfId != null) {
+            result = result.stream()
+                    .filter(consumer -> !excludeSelfId.equals(consumer.getId()))
+                    .collect(Collectors.toList());
+        }
+        return CollectionUtils.isNotEmpty(result);
+    }
+
+    @Override
+    public InlongConsumeInfo get(Integer id) {
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+        InlongConsumeEntity entity = consumeMapper.selectById(id);
+        if (entity == null) {
+            LOGGER.error("inlong consume not found with id={}", id);
+            throw new BusinessException(ErrorCodeEnum.CONSUME_NOT_FOUND);
+        }
+
+        InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(entity.getMqType());
+        InlongConsumeInfo consumeInfo = consumeOperator.getFromEntity(entity);
+
+        LOGGER.debug("success to get inlong consume for id={}", id);
+        return consumeInfo;
+    }
+
+    @Override
+    public InlongConsumeCountInfo countStatus(String username) {
+        List<Map<String, Object>> statusCount = consumeMapper.countByUser(username);
+        InlongConsumeCountInfo countInfo = new InlongConsumeCountInfo();
+        for (Map<String, Object> map : statusCount) {
+            int status = (Integer) map.get("status");
+            long count = (Long) map.get("count");
+            countInfo.setTotalCount(countInfo.getTotalCount() + count);
+            if (status == ConsumeStatus.WAIT_ASSIGN.getCode()) {
+                countInfo.setWaitAssignCount(countInfo.getWaitAssignCount() + count);
+            } else if (status == ConsumeStatus.WAIT_APPROVE.getCode()) {
+                countInfo.setWaitApproveCount(countInfo.getWaitApproveCount() + count);
+            } else if (status == ConsumeStatus.REJECTED.getCode()) {
+                countInfo.setRejectCount(countInfo.getRejectCount() + count);
+            }
+        }
+
+        LOGGER.debug("success to count inlong consume for user={}", username);
+        return countInfo;
+    }
+
+    @Override
+    public PageResult<InlongConsumeBriefInfo> list(InlongConsumePageRequest request) {
+        if (request.getPageSize() > MAX_PAGE_SIZE) {
+            LOGGER.warn("list inlong consumes, change page size from {} to {}", request.getPageSize(), MAX_PAGE_SIZE);
+            request.setPageSize(MAX_PAGE_SIZE);
+        }
+        PageHelper.startPage(request.getPageNum(), request.getPageSize());
+        OrderFieldEnum.checkOrderField(request);
+        OrderTypeEnum.checkOrderType(request);
+        Page<InlongConsumeEntity> entityPage = (Page<InlongConsumeEntity>) consumeMapper.selectByCondition(request);
+        List<InlongConsumeBriefInfo> briefInfos = CommonBeanUtils.copyListProperties(entityPage,
+                InlongConsumeBriefInfo::new);
+        PageResult<InlongConsumeBriefInfo> pageResult = new PageResult<>(briefInfos,
+                entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
+
+        LOGGER.debug("success to list inlong consume for {}", request);
+        return pageResult;
+    }
+
+    @Override
+    @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ,
+            propagation = Propagation.REQUIRES_NEW)
+    public Boolean update(InlongConsumeRequest request, String operator) {
+        LOGGER.debug("begin to update inlong consume={} by user={}", request, operator);
+        Preconditions.checkNotNull(request, "inlong consume request cannot be null");
+
+        // check if it can be modified
+        Integer consumeId = request.getId();
+        InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId);
+        Preconditions.checkNotNull(existEntity, "inlong consume not exist with id " + consumeId);
+        Preconditions.checkTrue(existEntity.getInCharges().contains(operator),
+                "operator" + operator + " has no privilege for the inlong consume");
+
+        if (!Objects.equals(existEntity.getVersion(), request.getVersion())) {
+            LOGGER.error(String.format("inlong consume has already updated, id=%s, curVersion=%s",
+                    existEntity.getId(), request.getVersion()));
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+
+        ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(existEntity.getStatus());
+        Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+                "inlong consume not allowed update when status is " + consumeStatus.name());
+
+        InlongConsumeOperator consumeOperator = consumeOperatorFactory.getInstance(request.getMqType());
+        consumeOperator.updateOpt(request, operator);
+
+        LOGGER.info("success to update inlong consume={} by user={}", request, operator);
+        return true;
+    }
+
+    @Override
+    public Boolean delete(Integer id, String operator) {
+        LOGGER.info("begin to delete inlong consume for id={} by user={}", id, operator);
+        Preconditions.checkNotNull(id, "inlong consume id cannot be null");
+        InlongConsumeEntity entity = consumeMapper.selectById(id);
+        Preconditions.checkNotNull(entity, "inlong consume not exist with id " + id);
+
+        entity.setIsDeleted(id);
+        entity.setStatus(ConsumeStatus.DELETED.getCode());
+        entity.setModifier(operator);
+
+        int rowCount = consumeMapper.updateByIdSelective(entity);
+        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+            LOGGER.error("inlong consume has already updated with id={}, curVersion={}", id, entity.getVersion());
+            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+        }
+
+        LOGGER.info("success to delete inlong consume for id={} by user={}", id, operator);
+        return true;
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 8a899a390..5641a293c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -24,7 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -104,9 +104,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
         return ConsumptionSummary.builder()
                 .totalCount(countMap.values().stream().mapToInt(c -> c).sum())
-                .waitingAssignCount(countMap.getOrDefault(ConsumptionStatus.WAIT_ASSIGN.getStatus() + "", 0))
-                .waitingApproveCount(countMap.getOrDefault(ConsumptionStatus.WAIT_APPROVE.getStatus() + "", 0))
-                .rejectedCount(countMap.getOrDefault(ConsumptionStatus.REJECTED.getStatus() + "", 0)).build();
+                .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.WAIT_ASSIGN.getCode() + "", 0))
+                .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.WAIT_APPROVE.getCode() + "", 0))
+                .rejectedCount(countMap.getOrDefault(ConsumeStatus.REJECTED.getCode() + "", 0)).build();
     }
 
     @Override
@@ -166,9 +166,9 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         if (info.getId() != null) {
             ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
             Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
-            ConsumptionStatus consumptionStatus = ConsumptionStatus.fromStatus(consumptionEntity.getStatus());
-            Preconditions.checkTrue(ConsumptionStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumptionStatus),
-                    "consumption not allow update when status is " + consumptionStatus.name());
+            ConsumeStatus consumeStatus = ConsumeStatus.fromStatus(consumptionEntity.getStatus());
+            Preconditions.checkTrue(ConsumeStatus.ALLOW_SAVE_UPDATE_STATUS.contains(consumeStatus),
+                    "consumption not allow update when status is " + consumeStatus.name());
         }
 
         setTopicInfo(info);
@@ -284,7 +284,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
             // If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
             // add/remove inlong streams in the inlong group
-            if (ConsumptionStatus.APPROVED.getStatus() == exists.getStatus()) {
+            if (ConsumeStatus.APPROVED.getCode() == exists.getStatus()) {
                 String groupId = info.getInlongGroupId();
                 String dlqNameOld = pulsarEntity.getDeadLetterTopic();
                 String dlqNameNew = update.getDeadLetterTopic();
@@ -356,7 +356,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         entity.setInCharges(groupInfo.getInCharges());
         entity.setFilterEnabled(0);
 
-        entity.setStatus(ConsumptionStatus.APPROVED.getStatus());
+        entity.setStatus(ConsumeStatus.APPROVED.getCode());
         String operator = groupInfo.getCreator();
         entity.setCreator(operator);
         entity.setModifier(operator);
@@ -377,7 +377,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
     private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
         ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
-        entity.setStatus(ConsumptionStatus.WAIT_ASSIGN.getStatus());
+        entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
         entity.setCreator(operator);
         entity.setModifier(operator);
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
index 0132b6259..8d950b706 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupProcessService.java
@@ -49,7 +49,7 @@ import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
 /**
- * Operation related to inlong group process
+ * Operation to the inlong group process
  */
 @Service
 public class InlongGroupProcessService {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
index d9c5f1c63..150c8e317 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupServiceImpl.java
@@ -77,6 +77,8 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
+
 /**
  * Inlong group service layer implementation
  */
@@ -85,7 +87,6 @@ import java.util.stream.Collectors;
 public class InlongGroupServiceImpl implements InlongGroupService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongGroupServiceImpl.class);
-    private static final Integer MAX_PAGE_SIZE = 100;
 
     @Autowired
     private InlongGroupOperatorFactory groupOperatorFactory;
@@ -184,7 +185,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
     @Override
     public PageResult<InlongGroupBriefInfo> listBrief(InlongGroupPageRequest request) {
         if (request.getPageSize() > MAX_PAGE_SIZE) {
-            LOGGER.warn("list group info, but page size is {}, change to {}", request.getPageSize(), MAX_PAGE_SIZE);
+            LOGGER.warn("list inlong groups, change page size from {} to {}", request.getPageSize(), MAX_PAGE_SIZE);
             request.setPageSize(MAX_PAGE_SIZE);
         }
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
@@ -275,8 +276,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         return true;
     }
 
-    @Transactional(rollbackFor = Throwable.class)
     @Override
+    @Transactional(rollbackFor = Throwable.class)
     public boolean delete(String groupId, String operator) {
         LOGGER.info("begin to delete inlong group for groupId={} by user={}", groupId, operator);
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
index caded9717..c06c3d5a2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCancelProcessListener.java
@@ -19,7 +19,7 @@ package org.apache.inlong.manager.service.listener.consumption;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -58,7 +58,7 @@ public class ConsumptionCancelProcessListener implements ProcessEventListener {
         ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
 
         ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
-        update.setStatus(ConsumptionStatus.CANCELED.getStatus());
+        update.setStatus(ConsumeStatus.CANCELED.getCode());
         int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
             log.error("consumption information has already updated, id={}, curVersion={}",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
index c42930487..e9d0795b0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
@@ -21,7 +21,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -104,7 +104,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
      */
     private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
         ConsumptionEntity update = consumptionMapper.selectByPrimaryKey(consumptionId);
-        update.setStatus(ConsumptionStatus.APPROVED.getStatus());
+        update.setStatus(ConsumeStatus.APPROVED.getCode());
         update.setConsumerGroup(consumerGroup);
         int rowCount = consumptionMapper.updateByPrimaryKeySelective(update);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
index 72af56894..b1e5030e7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionRejectProcessListener.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.listener.consumption;
 
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -60,7 +60,7 @@ public class ConsumptionRejectProcessListener implements ProcessEventListener {
         ApplyConsumptionProcessForm processForm = (ApplyConsumptionProcessForm) context.getProcessForm();
 
         ConsumptionEntity update = consumptionEntityMapper.selectByPrimaryKey(processForm.getConsumptionInfo().getId());
-        update.setStatus(ConsumptionStatus.REJECTED.getStatus());
+        update.setStatus(ConsumeStatus.REJECTED.getCode());
 
         int rowCount = consumptionEntityMapper.updateByPrimaryKeySelective(update);
         if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index ea5fdca22..d844285b7 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -140,6 +140,33 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
     UNIQUE KEY `unique_inlong_cluster_node` (`parent_id`, `type`, `ip`, `port`, `is_deleted`)
 );
 
+-- ----------------------------
+-- Table structure for inlong_consume
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_consume`
+(
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `consumer_group`   varchar(256) NOT NULL COMMENT 'Consumer group name, filled in by the user, undeleted ones cannot be repeated',
+    `description`      varchar(256)          DEFAULT '' COMMENT 'Inlong consume description',
+    `mq_type`          varchar(10)           DEFAULT 'TUBEMQ' COMMENT 'Message queue type, high throughput: TUBEMQ, high consistency: PULSAR',
+    `topic`            varchar(256) NOT NULL COMMENT 'The target topic of this consume',
+    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'The target inlong group id of this consume',
+    `filter_enabled`   int(2)                DEFAULT '0' COMMENT 'Whether to filter consume, 0: not filter, 1: filter',
+    `inlong_stream_id` varchar(256)          DEFAULT NULL COMMENT 'The target inlong stream id of this consume, needed if the filter_enabled=1',
+    `ext_params`       mediumtext            DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+    `in_charges`       varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+    `status`           int(4)                DEFAULT '100' COMMENT 'Inlong consume status',
+    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`          int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_inlong_consume` (`consumer_group`, `is_deleted`)
+);
+
+
 -- ----------------------------
 -- Table structure for data_node
 -- ----------------------------
@@ -166,7 +193,7 @@ CREATE TABLE IF NOT EXISTS `data_node`
 );
 
 -- ----------------------------
--- Table structure for consumption
+-- Deprecated: Table structure for consumption
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `consumption`
 (
@@ -189,7 +216,7 @@ CREATE TABLE IF NOT EXISTS `consumption`
 );
 
 -- ----------------------------
--- Table structure for consumption_pulsar
+-- Deprecated: Table structure for consumption_pulsar
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `consumption_pulsar`
 (
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 057af951f..cfa64e471 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -151,6 +151,33 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong cluster node table';
 
+-- ----------------------------
+-- Table structure for inlong_consume
+-- ----------------------------
+CREATE TABLE IF NOT EXISTS `inlong_consume`
+(
+    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
+    `consumer_group`   varchar(256) NOT NULL COMMENT 'Consumer group name, filled in by the user, undeleted ones cannot be repeated',
+    `description`      varchar(256)          DEFAULT '' COMMENT 'Inlong consume description',
+    `mq_type`          varchar(10)           DEFAULT 'TUBEMQ' COMMENT 'Message queue type, high throughput: TUBEMQ, high consistency: PULSAR',
+    `topic`            varchar(256) NOT NULL COMMENT 'The target topic of this consume',
+    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'The target inlong group id of this consume',
+    `filter_enabled`   int(2)                DEFAULT '0' COMMENT 'Whether to filter consume, 0: not filter, 1: filter',
+    `inlong_stream_id` varchar(256)          DEFAULT NULL COMMENT 'The target inlong stream id of this consume, needed if the filter_enabled=1',
+    `ext_params`       mediumtext            DEFAULT NULL COMMENT 'Extended params, will be saved as JSON string',
+    `in_charges`       varchar(512) NOT NULL COMMENT 'Name of responsible person, separated by commas',
+    `status`           int(4)                DEFAULT '100' COMMENT 'Inlong consume status',
+    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
+    `creator`          varchar(64)  NOT NULL COMMENT 'Creator name',
+    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'Modifier name',
+    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
+    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
+    `version`          int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
+    PRIMARY KEY (`id`),
+    UNIQUE KEY `unique_inlong_consume` (`consumer_group`, `is_deleted`)
+) ENGINE = InnoDB
+  DEFAULT CHARSET = utf8mb4 COMMENT ='Inlong consume table';
+
 -- ----------------------------
 -- Table structure for data_node
 -- ----------------------------
@@ -178,7 +205,7 @@ CREATE TABLE IF NOT EXISTS `data_node`
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data node table';
 
 -- ----------------------------
--- Table structure for consumption
+-- Deprecated: Table structure for consumption
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `consumption`
 (
@@ -202,7 +229,7 @@ CREATE TABLE IF NOT EXISTS `consumption`
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
 
 -- ----------------------------
--- Table structure for consumption_pulsar
+-- Deprecated: Table structure for consumption_pulsar
 -- ----------------------------
 CREATE TABLE IF NOT EXISTS `consumption_pulsar`
 (
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
index 7207ce726..f1c77ad51 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
 import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
 import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.core.impl.ConsumptionProcessService;
+import org.apache.inlong.manager.service.consume.InlongConsumeProcessService;
 import org.apache.inlong.manager.service.operationlog.OperationLog;
 import org.apache.inlong.manager.service.user.LoginUserUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -53,7 +53,7 @@ public class ConsumptionController {
     @Autowired
     private ConsumptionService consumptionService;
     @Autowired
-    private ConsumptionProcessService processOperation;
+    private InlongConsumeProcessService processOperation;
 
     @GetMapping("/consumption/summary")
     @ApiOperation(value = "Get data consumption summary")
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
new file mode 100644
index 000000000..4697cbe57
--- /dev/null
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/InlongConsumeController.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.web.controller;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiImplicitParam;
+import io.swagger.annotations.ApiOperation;
+import org.apache.inlong.manager.common.enums.OperationType;
+import org.apache.inlong.manager.common.enums.UserTypeEnum;
+import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.PageResult;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
+import org.apache.inlong.manager.service.consume.InlongConsumeProcessService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
+import org.apache.inlong.manager.service.operationlog.OperationLog;
+import org.apache.inlong.manager.service.user.LoginUserUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestMethod;
+import org.springframework.web.bind.annotation.RestController;
+
+/**
+ * Inlong consume control layer
+ */
+@RestController
+@RequestMapping("/api")
+@Api(tags = "Inlong-Consume-API")
+public class InlongConsumeController {
+
+    @Autowired
+    private InlongConsumeService consumeService;
+    @Autowired
+    private InlongConsumeProcessService consumeProcessService;
+
+    @RequestMapping(value = "/consume/save", method = RequestMethod.POST)
+    @OperationLog(operation = OperationType.CREATE)
+    @ApiOperation(value = "Save inlong consume")
+    public Response<Integer> save(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) {
+        String operator = LoginUserUtils.getLoginUser().getName();
+        return Response.success(consumeService.save(request, operator));
+    }
+
+    @GetMapping("/consume/get/{id}")
+    @ApiOperation(value = "Get inlong consume")
+    @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
+    public Response<InlongConsumeInfo> get(@PathVariable(name = "id") Integer id) {
+        return Response.success(consumeService.get(id));
+    }
+
+    @GetMapping(value = "/consume/countStatus")
+    @ApiOperation(value = "Count inlong consume status by current user")
+    public Response<InlongConsumeCountInfo> countStatusByUser() {
+        return Response.success(consumeService.countStatus(LoginUserUtils.getLoginUser().getName()));
+    }
+
+    @GetMapping("/consume/list")
+    @ApiOperation(value = "List inlong consume by pagination")
+    public Response<PageResult<InlongConsumeBriefInfo>> list(InlongConsumePageRequest request) {
+        request.setCurrentUser(LoginUserUtils.getLoginUser().getName());
+        request.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserTypeEnum.ADMIN.name()));
+        return Response.success(consumeService.list(request));
+    }
+
+    @PostMapping("/consume/update/{id}")
+    @OperationLog(operation = OperationType.UPDATE)
+    @ApiOperation(value = "Update inlong consume")
+    public Response<Boolean> update(@Validated(UpdateValidation.class) @RequestBody InlongConsumeRequest request) {
+        return Response.success(consumeService.update(request, LoginUserUtils.getLoginUser().getName()));
+    }
+
+    @DeleteMapping("/consume/delete/{id}")
+    @OperationLog(operation = OperationType.DELETE)
+    @ApiOperation(value = "Delete inlong consume by ID")
+    @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
+    public Response<Object> delete(@PathVariable(name = "id") Integer id) {
+        this.consumeService.delete(id, LoginUserUtils.getLoginUser().getName());
+        return Response.success();
+    }
+
+    @PostMapping("/consume/startProcess/{id}")
+    @OperationLog(operation = OperationType.UPDATE)
+    @ApiOperation(value = "Start inlong consume process")
+    @ApiImplicitParam(name = "id", value = "Inlong consume ID", dataTypeClass = Integer.class, required = true)
+    public Response<WorkflowResult> startProcess(@PathVariable(name = "id") Integer id) {
+        String username = LoginUserUtils.getLoginUser().getName();
+        return Response.success(consumeProcessService.startProcess(id, username));
+    }
+
+}