You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/29 11:58:45 UTC

[inlong] branch master updated: [INLONG-6085][Manager] Remove the unused data consumption related classes (#6314)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 32e017abc [INLONG-6085][Manager] Remove the unused data consumption related classes (#6314)
32e017abc is described below

commit 32e017abcc556407ce3314711656edf5ab78efa0
Author: healchow <he...@gmail.com>
AuthorDate: Sat Oct 29 19:58:41 2022 +0800

    [INLONG-6085][Manager] Remove the unused data consumption related classes (#6314)
---
 .../manager/dao/entity/ConsumptionEntity.java      |  48 ---
 .../dao/entity/ConsumptionPulsarEntity.java        |  41 ---
 .../dao/mapper/ConsumptionEntityMapper.java        |  50 ---
 .../dao/mapper/ConsumptionPulsarEntityMapper.java  |  43 ---
 .../dao/mapper/InlongConsumeEntityMapper.java      |   7 +-
 .../resources/mappers/ConsumptionEntityMapper.xml  | 311 ----------------
 .../mappers/ConsumptionPulsarEntityMapper.xml      | 181 ----------
 .../mappers/InlongConsumeEntityMapper.xml          |  62 +++-
 .../manager/pojo/consumption/ConsumptionInfo.java  | 101 ------
 .../pojo/consumption/ConsumptionListVo.java        |  66 ----
 .../pojo/consumption/ConsumptionMqExtBase.java     |  56 ---
 .../pojo/consumption/ConsumptionPulsarInfo.java    |  52 ---
 .../manager/pojo/consumption/ConsumptionQuery.java |  74 ----
 .../pojo/consumption/ConsumptionSummary.java       |  48 ---
 .../form/process/ApplyConsumeProcessForm.java      |   2 +-
 .../service/consume/ConsumePulsarOperator.java     |  36 +-
 .../consume/InlongConsumeOperatorFactory.java      |   7 +-
 .../service/consume/InlongConsumeServiceImpl.java  |   9 +-
 .../manager/service/core/ConsumptionService.java   |  89 -----
 .../service/core/impl/ConsumptionServiceImpl.java  | 399 ---------------------
 .../service/group/InlongGroupOperator4Pulsar.java  |   9 +
 .../{core => }/heartbeat/HeartbeatManager.java     |   4 +-
 .../impl => heartbeat}/HeartbeatServiceImpl.java   |   3 +-
 .../apply/ApproveConsumeProcessListener.java       |   6 +-
 .../queue/pulsar/PulsarResourceOperator.java       |   2 +-
 .../service/cluster/InlongClusterServiceTest.java  |   2 +-
 .../core/heartbeat/HeartbeatManagerTest.java       |   1 +
 .../service/core/impl/ConsumptionServiceTest.java  |  72 ----
 .../main/resources/h2/apache_inlong_manager.sql    |  41 +--
 .../manager-web/sql/apache_inlong_manager.sql      |  44 +--
 .../web/controller/ConsumptionController.java      | 115 ------
 .../inlong/manager/workflow/WorkflowConfig.java    | 100 ------
 32 files changed, 103 insertions(+), 1978 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionEntity.java
deleted file mode 100644
index 329fd167b..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionEntity.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-import java.util.Date;
-
-/**
- * Data consumption table
- */
-@Data
-public class ConsumptionEntity implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private Integer id;
-    private String consumerGroup;
-    private String inCharges;
-    private String inlongGroupId;
-    private String mqType;
-    private String topic;
-    private Integer filterEnabled;
-    private String inlongStreamId;
-    private Integer status;
-    private String creator;
-    private String modifier;
-    private Date createTime;
-    private Date modifyTime;
-    private Integer isDeleted;
-    private Integer version;
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
deleted file mode 100644
index 32e9ed129..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/entity/ConsumptionPulsarEntity.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.entity;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-/**
- * Data consumption table, including group name, group id, topic, etc
- */
-@Data
-public class ConsumptionPulsarEntity implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-    private Integer id;
-    private Integer consumptionId;
-    private String consumerGroup;
-    private String inlongGroupId;
-    private Integer isDlq;
-    private String deadLetterTopic;
-    private Integer isRlq;
-    private String retryLetterTopic;
-    private Integer isDeleted;
-
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
deleted file mode 100644
index 838488f96..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionEntityMapper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.mapper;
-
-import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.pojo.common.CountInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.springframework.stereotype.Repository;
-
-import java.util.List;
-
-@Repository
-public interface ConsumptionEntityMapper {
-
-    int deleteByPrimaryKey(Integer id);
-
-    int insert(ConsumptionEntity record);
-
-    int insertSelective(ConsumptionEntity record);
-
-    ConsumptionEntity selectByPrimaryKey(Integer id);
-
-    ConsumptionEntity selectConsumptionExists(@Param("groupId") String groupId, @Param("topic") String topic,
-            @Param("consumerGroup") String consumerGroup);
-
-    int updateByPrimaryKeySelective(ConsumptionEntity record);
-
-    int updateByPrimaryKey(ConsumptionEntity record);
-
-    List<ConsumptionEntity> listByQuery(ConsumptionQuery consumptionQuery);
-
-    List<CountInfo> countByQuery(ConsumptionQuery consumptionQuery);
-
-}
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
deleted file mode 100644
index 99f65d22c..000000000
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ConsumptionPulsarEntityMapper.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.dao.mapper;
-
-import org.apache.ibatis.annotations.Param;
-import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
-import org.springframework.stereotype.Repository;
-
-@Repository
-public interface ConsumptionPulsarEntityMapper {
-
-    int deleteByConsumptionId(@Param("consumptionId") Integer consumptionId);
-
-    int insert(ConsumptionPulsarEntity record);
-
-    int insertSelective(ConsumptionPulsarEntity record);
-
-    ConsumptionPulsarEntity selectByPrimaryKey(@Param("id") Integer id);
-
-    ConsumptionPulsarEntity selectByConsumptionId(@Param("consumptionId") Integer consumptionId);
-
-    int updateByPrimaryKeySelective(ConsumptionPulsarEntity record);
-
-    int updateByConsumptionId(ConsumptionPulsarEntity record);
-
-    int updateByPrimaryKey(ConsumptionPulsarEntity record);
-
-}
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 6b0db6617..a6b6090a0 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.dao.mapper;
 import org.apache.ibatis.annotations.Param;
 import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
 import org.apache.inlong.manager.pojo.common.CountInfo;
+import org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
 import org.springframework.stereotype.Repository;
 
@@ -34,11 +35,13 @@ public interface InlongConsumeEntityMapper {
 
     List<CountInfo> countByUser(@Param(value = "username") String username);
 
-    List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
-
     InlongConsumeEntity selectExists(@Param("consumerGroup") String consumerGroup, @Param("topic") String topic,
             @Param("inlongGroupId") String inlongGroupId);
 
+    List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
+
+    List<InlongConsumeBriefInfo> selectBriefList(InlongConsumePageRequest request);
+
     int updateById(InlongConsumeEntity record);
 
     int updateByIdSelective(InlongConsumeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
deleted file mode 100644
index 9865ca0a1..000000000
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionEntityMapper.xml
+++ /dev/null
@@ -1,311 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements. See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership. The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License. You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper">
-    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
-        <id column="id" jdbcType="INTEGER" property="id"/>
-        <result column="consumer_group" jdbcType="VARCHAR" property="consumerGroup"/>
-        <result column="in_charges" jdbcType="VARCHAR" property="inCharges"/>
-        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
-        <result column="mq_type" jdbcType="VARCHAR" property="mqType"/>
-        <result column="topic" jdbcType="VARCHAR" property="topic"/>
-        <result column="filter_enabled" jdbcType="INTEGER" property="filterEnabled"/>
-        <result column="inlong_stream_id" jdbcType="VARCHAR" property="inlongStreamId"/>
-        <result column="status" jdbcType="INTEGER" property="status"/>
-        <result column="creator" jdbcType="VARCHAR" property="creator"/>
-        <result column="modifier" jdbcType="VARCHAR" property="modifier"/>
-        <result column="create_time" jdbcType="TIMESTAMP" property="createTime"/>
-        <result column="modify_time" jdbcType="TIMESTAMP" property="modifyTime"/>
-        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
-        <result column="version" jdbcType="INTEGER" property="version"/>
-    </resultMap>
-    <sql id="Base_Column_List">
-        id, consumer_group, in_charges, inlong_group_id,
-        mq_type, topic, filter_enabled, inlong_stream_id,
-        status, is_deleted, creator, modifier, create_time, modify_time, version
-    </sql>
-    <select id="selectByPrimaryKey" parameterType="java.lang.Integer"
-            resultMap="BaseResultMap">
-        select
-        <include refid="Base_Column_List"/>
-        from consumption
-        where id = #{id, jdbcType=INTEGER} and is_deleted = 0
-    </select>
-    <select id="selectConsumptionExists" resultType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
-        select
-        <include refid="Base_Column_List"/>
-        from consumption
-        where inlong_group_id = #{groupId, jdbcType=VARCHAR}
-        and topic = #{topic, jdbcType=VARCHAR}
-        and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
-        and is_deleted = 0
-    </select>
-
-    <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
-        update consumption
-        set is_deleted = id
-        where id = #{id, jdbcType=INTEGER}
-    </delete>
-    <insert id="insert" useGeneratedKeys="true" keyProperty="id"
-            parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
-        insert into consumption (id, consumer_group, in_charges,
-                                 inlong_group_id, mq_type, topic,
-                                 filter_enabled, inlong_stream_id,
-                                 status, creator, modifier)
-        values (#{id, jdbcType=INTEGER}, #{consumerGroup, jdbcType=VARCHAR}, #{inCharges, jdbcType=VARCHAR},
-                #{inlongGroupId, jdbcType=VARCHAR}, #{mqType, jdbcType=VARCHAR}, #{topic, jdbcType=VARCHAR},
-                #{filterEnabled, jdbcType=INTEGER}, #{inlongStreamId, jdbcType=VARCHAR},
-                #{status, jdbcType=INTEGER}, #{creator, jdbcType=VARCHAR}, #{modifier, jdbcType=VARCHAR})
-    </insert>
-    <insert id="insertSelective" useGeneratedKeys="true" keyProperty="id"
-            parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
-        insert into consumption
-        <trim prefix="(" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                id,
-            </if>
-            <if test="consumerGroup != null">
-                consumer_group,
-            </if>
-            <if test="inCharges != null">
-                in_charges,
-            </if>
-            <if test="inlongGroupId != null">
-                inlong_group_id,
-            </if>
-            <if test="mqType != null">
-                mq_type,
-            </if>
-            <if test="topic != null">
-                topic,
-            </if>
-            <if test="filterEnabled != null">
-                filter_enabled,
-            </if>
-            <if test="inlongStreamId != null">
-                inlong_stream_id,
-            </if>
-            <if test="status != null">
-                status,
-            </if>
-            <if test="creator != null">
-                creator,
-            </if>
-            <if test="modifier != null">
-                modifier,
-            </if>
-        </trim>
-        <trim prefix="values (" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                #{id, jdbcType=INTEGER},
-            </if>
-            <if test="consumerGroup != null">
-                #{consumerGroup, jdbcType=VARCHAR},
-            </if>
-            <if test="inCharges != null">
-                #{inCharges, jdbcType=VARCHAR},
-            </if>
-            <if test="inlongGroupId != null">
-                #{inlongGroupId, jdbcType=VARCHAR},
-            </if>
-            <if test="mqType != null">
-                #{mqType, jdbcType=VARCHAR},
-            </if>
-            <if test="topic != null">
-                #{topic, jdbcType=VARCHAR},
-            </if>
-            <if test="filterEnabled != null">
-                #{filterEnabled, jdbcType=INTEGER},
-            </if>
-            <if test="inlongStreamId != null">
-                #{inlongStreamId, jdbcType=VARCHAR},
-            </if>
-            <if test="status != null">
-                #{status, jdbcType=INTEGER},
-            </if>
-            <if test="creator != null">
-                #{creator, jdbcType=VARCHAR},
-            </if>
-            <if test="modifier != null">
-                #{modifier, jdbcType=VARCHAR},
-            </if>
-        </trim>
-    </insert>
-    <update id="updateByPrimaryKeySelective"
-            parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
-        update consumption
-        <set>
-            <if test="consumerGroup != null">
-                consumer_group = #{consumerGroup, jdbcType=VARCHAR},
-            </if>
-            <if test="inCharges != null">
-                in_charges = #{inCharges, jdbcType=VARCHAR},
-            </if>
-            <if test="inlongGroupId != null">
-                inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR},
-            </if>
-            <if test="mqType != null">
-                mq_type = #{mqType, jdbcType=VARCHAR},
-            </if>
-            <if test="topic != null">
-                topic = #{topic, jdbcType=VARCHAR},
-            </if>
-            <if test="filterEnabled != null">
-                filter_enabled = #{filterEnabled, jdbcType=INTEGER},
-            </if>
-            <if test="inlongStreamId != null">
-                inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
-            </if>
-            <if test="status != null">
-                status = #{status, jdbcType=INTEGER},
-            </if>
-            <if test="creator != null">
-                creator = #{creator, jdbcType=VARCHAR},
-            </if>
-            <if test="modifier != null">
-                modifier = #{modifier, jdbcType=VARCHAR},
-            </if>
-            version = #{version, jdbcType=INTEGER} + 1
-        </set>
-        where id = #{id, jdbcType=INTEGER}
-        and is_deleted = 0
-        and version = #{version, jdbcType=INTEGER}
-    </update>
-    <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionEntity">
-        update consumption
-        set consumer_group   = #{consumerGroup, jdbcType=VARCHAR},
-            in_charges       = #{inCharges, jdbcType=VARCHAR},
-            inlong_group_id  = #{inlongGroupId, jdbcType=VARCHAR},
-            mq_type          = #{mqType, jdbcType=VARCHAR},
-            topic            = #{topic, jdbcType=VARCHAR},
-            filter_enabled   = #{filterEnabled, jdbcType=INTEGER},
-            inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR},
-            status           = #{status, jdbcType=INTEGER},
-            creator          = #{creator, jdbcType=VARCHAR},
-            modifier         = #{modifier, jdbcType=VARCHAR},
-            is_deleted       = #{isDeleted, jdbcType=INTEGER},
-            version          = #{version, jdbcType=INTEGER} + 1
-        where id = #{id, jdbcType=INTEGER}
-          and is_deleted = 0
-          and version = #{version, jdbcType=INTEGER}
-    </update>
-
-    <select id="listByQuery"
-            parameterType="org.apache.inlong.manager.pojo.consumption.ConsumptionQuery"
-            resultMap="BaseResultMap">
-        select
-        c.*
-        from consumption c
-        where c.is_deleted=0
-        <if test="consumerGroup != null and consumerGroup != ''">
-            and c.consumer_group = #{consumerGroup, jdbcType=VARCHAR}
-        </if>
-        <if test="inlongGroupId != null and inlongGroupId != ''">
-            and c.inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
-        </if>
-        <if test="mqType != null and mqType != ''">
-            and c.mq_type = #{mqType, jdbcType=VARCHAR}
-        </if>
-        <if test="topic != null and topic != ''">
-            and c.topic like CONCAT('%', #{topic}, '%')
-        </if>
-        <if test="filterEnabled != null">
-            and c.filter_enabled = #{filterEnabled, jdbcType=INTEGER}
-        </if>
-        <if test="inlongStreamId != null and inlongStreamId != ''">
-            and c.inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
-        </if>
-        <if test="status != null">
-            and c.status = #{status, jdbcType=INTEGER}
-        </if>
-        <if test="creator != null and creator != ''">
-            and c.creator = #{creator, jdbcType=VARCHAR}
-        </if>
-        <if test="modifier != null and modifier != ''">
-            and c.modifier = #{modifier, jdbcType=VARCHAR}
-        </if>
-        <if test="isAdminRole == false">
-            and (
-            FIND_IN_SET(#{username, jdbcType=VARCHAR},c.in_charges)
-            or c.creator = #{username, jdbcType=VARCHAR}
-            )
-        </if>
-        <if test="keyword != null and keyword !=''">
-            and (c.topic like CONCAT('%', #{keyword}, '%') or c.consumer_group like CONCAT('%', #{keyword}, '%'))
-        </if>
-        <if test="lastConsumptionStatus != null and lastConsumptionStatus != 3">
-            and cas.latest_record_state = #{lastConsumptionStatus, jdbcType=INTEGER}
-        </if>
-        <if test="lastConsumptionStatus != null and lastConsumptionStatus == 3">
-            and (cas.latest_record_state = #{lastConsumptionStatus, jdbcType=INTEGER}
-            or cas.latest_record_state is null)
-        </if>
-        order by id desc
-    </select>
-
-    <select id="countByQuery"
-            parameterType="org.apache.inlong.manager.pojo.consumption.ConsumptionQuery"
-            resultType="org.apache.inlong.manager.pojo.common.CountInfo">
-        select status as `key`, count(1) as `value`
-        from consumption
-        where is_deleted=0
-        <if test="consumerGroup != null and consumerGroup != ''">
-            and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
-        </if>
-        <if test="inCharges != null and inCharges != ''">
-            and FIND_IN_SET(#{inCharges, jdbcType=VARCHAR},in_charges)
-        </if>
-        <if test="inlongGroupId != null and inlongGroupId != ''">
-            and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
-        </if>
-        <if test="mqType != null and mqType != ''">
-            and mq_type = #{mqType, jdbcType=VARCHAR}
-        </if>
-        <if test="topic != null and topic != ''">
-            and topic = #{topic, jdbcType=VARCHAR}
-        </if>
-        <if test="filterEnabled != null">
-            and filter_enabled = #{filterEnabled, jdbcType=INTEGER}
-        </if>
-        <if test="inlongStreamId != null and inlongStreamId != ''">
-            and inlong_stream_id = #{inlongStreamId, jdbcType=VARCHAR}
-        </if>
-        <if test="status != null">
-            and status = #{status, jdbcType=INTEGER}
-        </if>
-        <if test="creator != null and creator != ''">
-            and creator = #{creator, jdbcType=VARCHAR}
-        </if>
-        <if test="modifier != null and modifier != ''">
-            and modifier = #{modifier, jdbcType=VARCHAR}
-        </if>
-        <if test="username != null and username !=''">
-            and (
-            FIND_IN_SET(#{username, jdbcType=VARCHAR},in_charges)
-            or creator = #{username, jdbcType=VARCHAR}
-            )
-        </if>
-        <if test="keyword != null and keyword !=''">
-            and ( topic like CONCAT('%', #{keyword}, '%') or consumer_group like CONCAT('%', #{keyword}, '%') )
-        </if>
-        group by status
-    </select>
-</mapper>
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
deleted file mode 100644
index f5eb73453..000000000
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ConsumptionPulsarEntityMapper.xml
+++ /dev/null
@@ -1,181 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements. See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership. The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License. You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing,
-    software distributed under the License is distributed on an
-    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-    KIND, either express or implied.  See the License for the
-    specific language governing permissions and limitations
-    under the License.
--->
-
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-<mapper namespace="org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper">
-    <resultMap id="BaseResultMap" type="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
-        <id column="id" jdbcType="INTEGER" property="id"/>
-        <result column="consumption_id" jdbcType="VARCHAR" property="consumptionId"/>
-        <result column="consumer_group" jdbcType="VARCHAR" property="consumerGroup"/>
-        <result column="inlong_group_id" jdbcType="VARCHAR" property="inlongGroupId"/>
-        <result column="is_rlq" jdbcType="INTEGER" property="isRlq"/>
-        <result column="retry_letter_topic" jdbcType="VARCHAR" property="retryLetterTopic"/>
-        <result column="is_dlq" jdbcType="INTEGER" property="isDlq"/>
-        <result column="dead_letter_topic" jdbcType="VARCHAR" property="deadLetterTopic"/>
-        <result column="is_deleted" jdbcType="INTEGER" property="isDeleted"/>
-    </resultMap>
-    <sql id="Base_Column_List">
-        id, consumption_id, consumer_group, inlong_group_id, is_rlq, retry_letter_topic,
-        is_dlq, dead_letter_topic, is_deleted
-    </sql>
-    <select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
-        select
-        <include refid="Base_Column_List"/>
-        from consumption_pulsar
-        where id = #{id,jdbcType=INTEGER}
-    </select>
-    <select id="selectByConsumptionId" parameterType="java.lang.Integer" resultMap="BaseResultMap">
-        select
-        <include refid="Base_Column_List"/>
-        from consumption_pulsar
-        where consumption_id = #{consumptionId, jdbcType=INTEGER}
-        and is_deleted = 0
-    </select>
-
-    <delete id="deleteByConsumptionId">
-        update consumption_pulsar
-        set is_deleted = id
-        where consumption_id = #{consumptionId, jdbcType=INTEGER}
-          and is_deleted = 0
-    </delete>
-
-    <insert id="insert" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
-        insert into consumption_pulsar (id, consumption_id, consumer_group,
-                                        inlong_group_id, is_rlq, retry_letter_topic,
-                                        is_dlq, dead_letter_topic, is_deleted)
-        values (#{id,jdbcType=INTEGER}, #{consumptionId,jdbcType=INTEGER}, #{consumerGroup,jdbcType=VARCHAR},
-                #{inlongGroupId,jdbcType=VARCHAR}, #{isRlq,jdbcType=INTEGER}, #{retryLetterTopic,jdbcType=VARCHAR},
-                #{isDlq,jdbcType=INTEGER}, #{deadLetterTopic,jdbcType=VARCHAR}, #{isDeleted,jdbcType=INTEGER})
-    </insert>
-    <insert id="insertSelective" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
-        insert into consumption_pulsar
-        <trim prefix="(" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                id,
-            </if>
-            <if test="consumptionId != null">
-                consumption_id,
-            </if>
-            <if test="consumerGroup != null">
-                consumer_group,
-            </if>
-            <if test="inlongGroupId != null">
-                inlong_group_id,
-            </if>
-            <if test="isRlq != null">
-                is_rlq,
-            </if>
-            <if test="retryLetterTopic != null">
-                retry_letter_topic,
-            </if>
-            <if test="isDlq != null">
-                is_dlq,
-            </if>
-            <if test="deadLetterTopic != null">
-                dead_letter_topic,
-            </if>
-            <if test="isDeleted != null">
-                is_deleted,
-            </if>
-        </trim>
-        <trim prefix="values (" suffix=")" suffixOverrides=",">
-            <if test="id != null">
-                #{id,jdbcType=INTEGER},
-            </if>
-            <if test="consumerGroup != null">
-                #{consumerGroup,jdbcType=VARCHAR},
-            </if>
-            <if test="inlongGroupId != null">
-                #{inlongGroupId,jdbcType=VARCHAR},
-            </if>
-            <if test="isRlq != null">
-                #{isRlq,jdbcType=INTEGER},
-            </if>
-            <if test="retryLetterTopic != null">
-                #{retryLetterTopic,jdbcType=VARCHAR},
-            </if>
-            <if test="isDlq != null">
-                #{isDlq,jdbcType=INTEGER},
-            </if>
-            <if test="deadLetterTopic != null">
-                #{deadLetterTopic,jdbcType=VARCHAR},
-            </if>
-            <if test="isDeleted != null">
-                #{isDeleted,jdbcType=INTEGER},
-            </if>
-        </trim>
-    </insert>
-    <update id="updateByPrimaryKeySelective"
-            parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
-        update consumption_pulsar
-        <set>
-            <if test="consumptionId != null">
-                consumption_id = #{consumptionId,jdbcType=INTEGER},
-            </if>
-            <if test="consumerGroup != null">
-                consumer_group = #{consumerGroup,jdbcType=VARCHAR},
-            </if>
-            <if test="inlongGroupId != null">
-                inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-            </if>
-            <if test="isRlq != null">
-                is_rlq = #{isRlq,jdbcType=INTEGER},
-            </if>
-            <if test="retryLetterTopic != null">
-                retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
-            </if>
-            <if test="isDlq != null">
-                is_dlq = #{isDlq,jdbcType=INTEGER},
-            </if>
-            <if test="deadLetterTopic != null">
-                dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
-            </if>
-            <if test="isDeleted != null">
-                is_deleted = #{isDeleted,jdbcType=INTEGER},
-            </if>
-        </set>
-        where id = #{id,jdbcType=INTEGER}
-    </update>
-    <update id="updateByConsumptionId">
-        update consumption_pulsar
-        <set>
-            consumer_group = #{consumerGroup,jdbcType=VARCHAR},
-            inlong_group_id = #{inlongGroupId,jdbcType=VARCHAR},
-            is_rlq = #{isRlq,jdbcType=INTEGER},
-            retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
-            is_dlq = #{isDlq,jdbcType=INTEGER},
-            dead_letter_topic = #{deadLetterTopic,jdbcType=VARCHAR},
-            is_deleted = #{isDeleted,jdbcType=INTEGER},
-        </set>
-        where consumption_id = #{consumptionId,jdbcType=INTEGER}
-    </update>
-    <update id="updateByPrimaryKey" parameterType="org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity">
-        update consumption_pulsar
-        set consumption_id     = #{consumptionId,jdbcType=INTEGER},
-            consumer_group     = #{consumerGroup,jdbcType=VARCHAR},
-            inlong_group_id    = #{inlongGroupId,jdbcType=VARCHAR},
-            is_rlq             = #{isRlq,jdbcType=INTEGER},
-            retry_letter_topic = #{retryLetterTopic,jdbcType=VARCHAR},
-            is_dlq             = #{isDlq,jdbcType=INTEGER},
-            dead_letter_topic  = #{deadLetterTopic,jdbcType=VARCHAR},
-            is_deleted         = #{isDeleted,jdbcType=INTEGER}
-        where id = #{id,jdbcType=INTEGER}
-    </update>
-</mapper>
\ No newline at end of file
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
index c38c53028..031d96c4c 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -74,6 +74,16 @@
           and (creator = #{username, jdbcType=VARCHAR} or FIND_IN_SET(#{username, jdbcType=VARCHAR}, in_charges))
         group by status
     </select>
+    <select id="selectExists" resultType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_consume
+        where is_deleted = 0
+        and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+        and topic = #{topic, jdbcType=VARCHAR}
+        and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+        limit 1
+    </select>
     <select id="selectByCondition" resultMap="BaseResultMap"
             parameterType="org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest">
         select
@@ -120,15 +130,51 @@
             </otherwise>
         </choose>
     </select>
-    <select id="selectExists" resultType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
-        select
-        <include refid="Base_Column_List"/>
+    <select id="selectBriefList" parameterType="org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest"
+            resultType="org.apache.inlong.manager.pojo.consume.InlongConsumeBriefInfo">
+        select id, consumer_group, mq_type, topic, inlong_group_id,
+        in_charges, status, creator, modifier, create_time, modify_time
         from inlong_consume
-        where is_deleted = 0
-        and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
-        and topic = #{topic, jdbcType=VARCHAR}
-        and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
-        limit 1
+        <where>
+            is_deleted = 0
+            <if test="isAdminRole == false">
+                and (
+                creator = #{currentUser, jdbcType=VARCHAR} or FIND_IN_SET(#{currentUser, jdbcType=VARCHAR}, in_charges)
+                )
+            </if>
+            <if test="keyword != null and keyword !=''">
+                and (consumer_group like CONCAT('%', #{keyword}, '%') or topic like CONCAT('%', #{keyword}, '%'))
+            </if>
+            <if test="consumerGroup != null and consumerGroup != ''">
+                and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+            </if>
+            <if test="mqType != null and mqType != ''">
+                and mq_type = #{mqType, jdbcType=VARCHAR}
+            </if>
+            <if test="topic != null and topic != ''">
+                and topic = #{topic}
+            </if>
+            <if test="inlongGroupId != null and inlongGroupId != ''">
+                and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+            </if>
+            <if test="status != null">
+                and status = #{status, jdbcType=INTEGER}
+            </if>
+            <if test="statusList != null and statusList.size() > 0">
+                and status in
+                <foreach collection="statusList" item="status" index="index" open="(" close=")" separator=",">
+                    #{status}
+                </foreach>
+            </if>
+        </where>
+        <choose>
+            <when test="orderField != null and orderField != '' and orderType != null and orderType != ''">
+                order by ${orderField} ${orderType}
+            </when>
+            <otherwise>
+                order by create_time desc
+            </otherwise>
+        </choose>
     </select>
 
     <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionInfo.java
deleted file mode 100644
index 7e2abccfc..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionInfo.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import org.apache.commons.lang3.StringUtils;
-
-import javax.validation.constraints.AssertTrue;
-import javax.validation.constraints.NotBlank;
-import java.util.Date;
-
-/**
- * Data consumption info
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption info")
-public class ConsumptionInfo {
-
-    @ApiModelProperty(value = "key id")
-    private Integer id;
-
-    @ApiModelProperty(value = "consumer group: only support [a-zA-Z0-9_]")
-    @NotBlank(message = "consumerGroup cannot be null")
-    private String consumerGroup;
-
-    @ApiModelProperty(value = "consumption in charge")
-    @NotBlank(message = "inCharges cannot be null")
-    private String inCharges;
-
-    @ApiModelProperty(value = "consumption target inlong group id")
-    @NotBlank(message = "inlong group id cannot be null")
-    private String inlongGroupId;
-
-    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
-    private String mqType;
-
-    @ApiModelProperty(value = "consumption target topic")
-    private String topic;
-
-    @ApiModelProperty(value = "middleware cluster url")
-    private String masterUrl;
-
-    @ApiModelProperty(value = "whether to filter consumption, 0: not filter, 1: filter")
-    @Builder.Default
-    private Integer filterEnabled = 0;
-
-    @ApiModelProperty(value = "consumption target inlong stream id")
-    private String inlongStreamId;
-
-    @ApiModelProperty(value = "status, 10: pending assigned, 11: pending approval, "
-            + "20: approval rejected, 20: approved")
-    private Integer status;
-
-    private String creator;
-
-    private String modifier;
-
-    private Date createTime;
-
-    private Date modifyTime;
-
-    @ApiModelProperty(value = "Extended information for MQ")
-    private ConsumptionMqExtBase mqExtInfo;
-
-    @ApiModelProperty(value = "Version number")
-    private Integer version;
-
-    @JsonIgnore
-    @AssertTrue(message = "when filter enabled, inlong stream id cannot be null")
-    public boolean isValidateFilter() {
-        if (filterEnabled == 0) {
-            return true;
-        }
-        return StringUtils.isNotBlank(inlongStreamId);
-    }
-
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionListVo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionListVo.java
deleted file mode 100644
index 0f7affd43..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionListVo.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.Date;
-
-/**
- * Data consumption list
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption list")
-public class ConsumptionListVo {
-
-    @ApiModelProperty(value = "Primary key")
-    private Integer id;
-
-    @ApiModelProperty(value = "Consumer Group")
-    private String consumerGroup;
-
-    @ApiModelProperty(value = "Person in charge of consumption")
-    private String inCharges;
-
-    @ApiModelProperty(value = "Consumption target inlong group id")
-    private String inlongGroupId;
-
-    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
-    private String mqType;
-
-    @ApiModelProperty(value = "Consumption target TOPIC")
-    private String topic;
-
-    @ApiModelProperty(value = "Status: Pending distribution: 10,"
-            + " Pending approval: 11, Approval rejected: 20, Approval passed: 21")
-    private Integer status;
-
-    @ApiModelProperty(value = "Recent consumption time")
-    private Date lastConsumptionTime;
-
-    @ApiModelProperty(value = "Consumption status: normal: 0, abnormal: 1, shielded: 2, no: 3,")
-    private Integer lastConsumptionStatus;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionMqExtBase.java
deleted file mode 100644
index b177fda17..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionMqExtBase.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import com.fasterxml.jackson.annotation.JsonSubTypes;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-
-/**
- * Extended consumption information of different MQs
- */
-@Data
-@ApiModel("Extended consumption information of different MQs")
-@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "mqType",
-        defaultImpl = ConsumptionMqExtBase.class)
-@JsonSubTypes({
-        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "PULSAR"),
-        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "TDMQ_PULSAR")
-})
-public class ConsumptionMqExtBase {
-
-    @ApiModelProperty(value = "Primary key")
-    private Integer id;
-
-    @ApiModelProperty(value = "Consumption ID")
-    private Integer consumptionId;
-
-    @ApiModelProperty(value = "Consumer group")
-    private String consumerGroup;
-
-    @ApiModelProperty(value = "Consumption target inlong group id")
-    private String inlongGroupId;
-
-    @ApiModelProperty("Whether to delete, 0: not deleted, 1: deleted")
-    private Integer isDeleted = 0;
-
-    @ApiModelProperty("The type of MQ")
-    private String mqType;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionPulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionPulsarInfo.java
deleted file mode 100644
index c9cbce4dd..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionPulsarInfo.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.ToString;
-import org.apache.inlong.manager.common.consts.MQType;
-
-/**
- * Pulsar consumer information
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-@ToString(callSuper = true)
-@ApiModel("Pulsar consumer information")
-public class ConsumptionPulsarInfo extends ConsumptionMqExtBase {
-
-    public ConsumptionPulsarInfo() {
-        this.setMqType(MQType.PULSAR);
-    }
-
-    @ApiModelProperty("Whether to configure the dead letter queue, 0: do not configure, 1: configure")
-    private Integer isDlq;
-
-    @ApiModelProperty("The name of the dead letter queue Topic")
-    private String deadLetterTopic;
-
-    @ApiModelProperty("Whether to configure the retry letter queue, 0: do not configure, 1: configure")
-    private Integer isRlq;
-
-    @ApiModelProperty("The name of the retry letter queue topic")
-    private String retryLetterTopic;
-
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionQuery.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionQuery.java
deleted file mode 100644
index c8c22e9d9..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionQuery.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import org.apache.inlong.manager.pojo.common.PageRequest;
-
-/**
- * Data consumption query
- */
-@Data
-@EqualsAndHashCode(callSuper = false)
-@ApiModel("Data consumption query conditions")
-public class ConsumptionQuery extends PageRequest {
-
-    @ApiModelProperty(value = "Consumer Group")
-    private String consumerGroup;
-
-    @ApiModelProperty(value = "Person in charge of consumption")
-    private String inCharges;
-
-    @ApiModelProperty(value = "Consumption target inlong group id")
-    private String inlongGroupId;
-
-    @ApiModelProperty(value = "MQ type, high throughput: TUBEMQ, high consistency: PULSAR")
-    private String mqType;
-
-    @ApiModelProperty(value = "Consumption target Topic")
-    private String topic;
-
-    @ApiModelProperty(value = "Whether to filter consumption")
-    private Boolean filterEnabled;
-
-    @ApiModelProperty(value = "Consumption target stream id")
-    private String inlongStreamId;
-
-    @ApiModelProperty(value = "Status: Draft: 0, Pending distribution: 10, "
-            + "Pending approval: 11, Approval rejected: 20, Approved: 21")
-    private Integer status;
-
-    @ApiModelProperty(value = "Consumption status: normal: 0, abnormal: 1, shielded: 2, no: 3")
-    private Integer lastConsumptionStatus;
-
-    private String creator;
-
-    private String modifier;
-
-    @ApiModelProperty(value = "Current login user")
-    private String username;
-
-    @ApiModelProperty(value = "Weather current user have admin role", hidden = true)
-    private Boolean isAdminRole;
-
-    @ApiModelProperty(value = "Fuzzy query keyword, topic, or consumer group")
-    private String keyword;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionSummary.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionSummary.java
deleted file mode 100644
index 5264f216b..000000000
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/consumption/ConsumptionSummary.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.pojo.consumption;
-
-import io.swagger.annotations.ApiModel;
-import io.swagger.annotations.ApiModelProperty;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-/**
- * Data consumption statistics
- */
-@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-@ApiModel("Data consumption statistics")
-public class ConsumptionSummary {
-
-    @ApiModelProperty(value = "Total consumption")
-    private Integer totalCount;
-
-    @ApiModelProperty(value = "Amount to be allocated")
-    private Integer waitingAssignCount;
-
-    @ApiModelProperty(value = "Amount to be approved")
-    private Integer waitingApproveCount;
-
-    @ApiModelProperty(value = "Quantity rejected")
-    private Integer rejectedCount;
-}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
index 85d657c45..9e535c989 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/workflow/form/process/ApplyConsumeProcessForm.java
@@ -28,7 +28,7 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 import java.util.Map;
 
 /**
- * New data consumption form
+ * Apply inlong consume process form
  */
 @Data
 @EqualsAndHashCode(callSuper = true)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
index 5d13d65ee..a21c36670 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -21,23 +21,21 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.MQType;
-import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
-import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarDTO;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -56,7 +54,7 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
     private static final String PREFIX_RLQ = "rlq";
 
     @Autowired
-    private InlongGroupEntityMapper groupMapper;
+    private InlongGroupService groupService;
     @Autowired
     private InlongStreamEntityMapper streamMapper;
     @Autowired
@@ -81,20 +79,18 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
         // one inlong stream only has one Pulsar topic,
         // one inlong group may have multiple Pulsar topics.
         String groupId = request.getInlongGroupId();
+        InlongGroupTopicInfo topicInfo = groupService.getTopic(groupId);
+        Preconditions.checkNotNull(topicInfo, "inlong group not exist for groupId=" + groupId);
+
+        // check the origin topic from request exists
+        InlongPulsarTopicInfo pulsarTopic = (InlongPulsarTopicInfo) topicInfo;
         String originTopic = request.getTopic();
-        InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(groupId, originTopic);
-        Preconditions.checkNotNull(streamEntity, "not found any Pulsar topic for inlong group " + groupId);
+        Preconditions.checkTrue(pulsarTopic.getTopics().contains(originTopic),
+                "Pulsar topic not exist for " + originTopic);
 
         // format the topic to 'tenant/namespace/topic'
-        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
-                groupEntity.getInlongClusterTag(), null, ClusterType.PULSAR);
-        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
-                ? InlongConstants.DEFAULT_PULSAR_TENANT
-                : pulsarCluster.getTenant();
-
-        request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
-                groupEntity.getMqResource(), originTopic));
+        request.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
+                pulsarTopic.getTenant(), pulsarTopic.getNamespace(), originTopic));
     }
 
     @Override
@@ -122,8 +118,8 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
             throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
         }
 
-        // when saving, the DLQ / RLQ under the same groupId cannot be repeated.
-        // when updating, delete the related DLQ / RLQ info.
+        // TODO when saving, save the enabled DLQ / RLQ into inlong_stream, then create Pulsar topic for them
+        //  when updating, delete the related DLQ / RLQ info if they were disabled.
         String groupId = targetEntity.getInlongGroupId();
         if (dlqEnable) {
             String dlqTopic = PREFIX_DLQ + "_" + pulsarRequest.getDeadLetterTopic();
@@ -132,6 +128,7 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
         } else {
             pulsarRequest.setIsDlq(DLQ__RLQ_DISABLE);
             pulsarRequest.setDeadLetterTopic(null);
+            // streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
         }
         if (rlqEnable) {
             String rlqTopic = PREFIX_RLQ + "_" + pulsarRequest.getRetryLetterTopic();
@@ -140,6 +137,7 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
         } else {
             pulsarRequest.setIsRlq(DLQ__RLQ_DISABLE);
             pulsarRequest.setRetryLetterTopic(null);
+            // streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
         }
 
         try {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
index 1a4a57482..e4a252d02 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeOperatorFactory.java
@@ -34,13 +34,14 @@ public class InlongConsumeOperatorFactory {
     private List<InlongConsumeOperator> consumeOperatorList;
 
     /**
-     * Get a consumption operator instance via the given mqType
+     * Get an inlong consume operator instance via the given mqType
      */
     public InlongConsumeOperator getInstance(String mqType) {
         return consumeOperatorList.stream()
                 .filter(inst -> inst.accept(mqType))
                 .findFirst()
-                .orElseThrow(() -> new
-                        BusinessException(String.format(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage(), mqType)));
+                .orElseThrow(() -> new BusinessException(
+                        String.format(ErrorCodeEnum.MQ_TYPE_NOT_SUPPORTED.getMessage(), mqType))
+                );
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index 51b2868f5..f9ca2eb6c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -24,7 +24,6 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongConsumeEntity;
 import org.apache.inlong.manager.dao.mapper.InlongConsumeEntityMapper;
@@ -59,7 +58,7 @@ import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
 public class InlongConsumeServiceImpl implements InlongConsumeService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class);
-    private static final String AUTO_CREATE_MSG = "auto create by inlong";
+    private static final String AUTO_CREATE_MSG = "auto_create_by_system";
 
     @Autowired
     private InlongConsumeEntityMapper consumeMapper;
@@ -175,11 +174,9 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         OrderFieldEnum.checkOrderField(request);
         OrderTypeEnum.checkOrderType(request);
-        Page<InlongConsumeEntity> entityPage = (Page<InlongConsumeEntity>) consumeMapper.selectByCondition(request);
-        List<InlongConsumeBriefInfo> briefInfos = CommonBeanUtils.copyListProperties(entityPage,
-                InlongConsumeBriefInfo::new);
+        Page<InlongConsumeBriefInfo> briefInfos = (Page<InlongConsumeBriefInfo>) consumeMapper.selectBriefList(request);
         PageResult<InlongConsumeBriefInfo> pageResult = new PageResult<>(briefInfos,
-                entityPage.getTotal(), entityPage.getPageNum(), entityPage.getPageSize());
+                briefInfos.getTotal(), briefInfos.getPageNum(), briefInfos.getPageSize());
 
         LOGGER.debug("success to list inlong consume for {}", request);
         return pageResult;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
deleted file mode 100644
index 5a181fd97..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core;
-
-import org.apache.inlong.manager.pojo.common.PageResult;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-
-/**
- * Data consumption interface
- */
-public interface ConsumptionService {
-
-    /**
-     * Data consumption statistics
-     *
-     * @param query Query conditions
-     * @return Statistics
-     */
-    ConsumptionSummary getSummary(ConsumptionQuery query);
-
-    /**
-     * Get data consumption list according to query conditions
-     *
-     * @param query Consumption info
-     * @return Consumption list
-     */
-    PageResult<ConsumptionListVo> list(ConsumptionQuery query);
-
-    /**
-     * Get data consumption details
-     *
-     * @param id Consumer ID
-     * @return Details
-     */
-    ConsumptionInfo get(Integer id);
-
-    /**
-     * Determine whether the Consumer group already exists
-     *
-     * @param consumerGroup Consumer group
-     * @param excludeSelfId Exclude the ID of this record
-     * @return does it exist
-     */
-    boolean isConsumerGroupExists(String consumerGroup, Integer excludeSelfId);
-
-    /**
-     * Save basic data consumption information
-     *
-     * @param consumptionInfo Data consumption information
-     * @param operator Operator
-     * @return ID after saved
-     */
-    Integer save(ConsumptionInfo consumptionInfo, String operator);
-
-    /**
-     * Update the person in charge of data consumption, etc
-     *
-     * @param consumptionInfo consumption information
-     * @param operator Operator
-     */
-    Boolean update(ConsumptionInfo consumptionInfo, String operator);
-
-    /**
-     * Delete data consumption
-     *
-     * @param id Consumer ID
-     * @param operator Operator
-     */
-    Boolean delete(Integer id, String operator);
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
deleted file mode 100644
index f64eaa192..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import com.github.pagehelper.Page;
-import com.github.pagehelper.PageHelper;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.common.consts.MQType;
-import org.apache.inlong.manager.common.enums.ClusterType;
-import org.apache.inlong.manager.common.enums.ConsumeStatus;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
-import org.apache.inlong.manager.dao.entity.ConsumptionPulsarEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.dao.mapper.ConsumptionPulsarEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.pojo.common.PageResult;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionMqExtBase;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
-import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
-import org.apache.inlong.manager.pojo.user.UserRoleCode;
-import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.stream.InlongStreamService;
-import org.apache.inlong.manager.service.user.LoginUserUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.util.CollectionUtils;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-/**
- * Data consumption service
- */
-@Slf4j
-@Service
-public class ConsumptionServiceImpl implements ConsumptionService {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumptionServiceImpl.class);
-
-    private static final String PREFIX_DLQ = "dlq"; // prefix of the Topic of the dead letter queue
-
-    private static final String PREFIX_RLQ = "rlq"; // prefix of the Topic of the retry letter queue
-
-    @Autowired
-    private InlongGroupEntityMapper groupMapper;
-    @Autowired
-    private ConsumptionEntityMapper consumptionMapper;
-    @Autowired
-    private ConsumptionPulsarEntityMapper consumptionPulsarMapper;
-    @Autowired
-    private InlongGroupService groupService;
-    @Autowired
-    private InlongStreamService streamService;
-    @Autowired
-    private InlongClusterService clusterService;
-
-    @Override
-    public ConsumptionSummary getSummary(ConsumptionQuery query) {
-        Map<String, Integer> countMap = new HashMap<>();
-        consumptionMapper.countByQuery(query)
-                .forEach(countInfo -> countMap.put(countInfo.getKey(), countInfo.getValue()));
-
-        return ConsumptionSummary.builder()
-                .totalCount(countMap.values().stream().mapToInt(c -> c).sum())
-                .waitingAssignCount(countMap.getOrDefault(ConsumeStatus.TO_BE_SUBMIT.getCode() + "", 0))
-                .waitingApproveCount(countMap.getOrDefault(ConsumeStatus.TO_BE_APPROVAL.getCode() + "", 0))
-                .rejectedCount(countMap.getOrDefault(ConsumeStatus.APPROVE_REJECTED.getCode() + "", 0)).build();
-    }
-
-    @Override
-    public PageResult<ConsumptionListVo> list(ConsumptionQuery query) {
-        PageHelper.startPage(query.getPageNum(), query.getPageSize());
-        query.setIsAdminRole(LoginUserUtils.getLoginUser().getRoles().contains(UserRoleCode.ADMIN));
-        Page<ConsumptionEntity> pageResult = (Page<ConsumptionEntity>) consumptionMapper.listByQuery(query);
-        List<ConsumptionListVo> consumptionListVos = CommonBeanUtils.copyListProperties(pageResult.getResult(),
-                ConsumptionListVo::new);
-
-        return new PageResult<>(
-                consumptionListVos, pageResult.getTotal(), pageResult.getPageNum(), pageResult.getPageSize()
-        );
-    }
-
-    @Override
-    public ConsumptionInfo get(Integer id) {
-        Preconditions.checkNotNull(id, "consumption id cannot be null");
-        ConsumptionEntity entity = consumptionMapper.selectByPrimaryKey(id);
-        Preconditions.checkNotNull(entity, "consumption not exist with id:" + id);
-
-        ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
-        String mqType = info.getMqType();
-        if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-            ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
-            Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
-            ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
-            info.setMqExtInfo(pulsarInfo);
-        }
-
-        return info;
-    }
-
-    @Override
-    public boolean isConsumerGroupExists(String consumerGroup, Integer excludeSelfId) {
-        ConsumptionQuery consumptionQuery = new ConsumptionQuery();
-        consumptionQuery.setConsumerGroup(consumerGroup);
-        consumptionQuery.setIsAdminRole(true);
-        List<ConsumptionEntity> result = consumptionMapper.listByQuery(consumptionQuery);
-        if (excludeSelfId != null) {
-            result = result.stream().filter(consumer -> !excludeSelfId.equals(consumer.getId()))
-                    .collect(Collectors.toList());
-        }
-        return !CollectionUtils.isEmpty(result);
-    }
-
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Integer save(ConsumptionInfo info, String operator) {
-        log.debug("begin to save consumption info={}", info);
-        Preconditions.checkNotNull(info, "consumption info cannot be null");
-        Preconditions.checkNotNull(info.getTopic(), "consumption topic cannot be empty");
-        if (isConsumerGroupExists(info.getConsumerGroup(), info.getId())) {
-            throw new BusinessException(String.format("consumer group %s already exist", info.getConsumerGroup()));
-        }
-
-        if (info.getId() != null) {
-            ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(info.getId());
-            Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + info.getId());
-            ConsumeStatus consumeStatus = ConsumeStatus.forCode(consumptionEntity.getStatus());
-            Preconditions.checkTrue(ConsumeStatus.allowedUpdate(consumeStatus),
-                    "consumption not allow update when status is " + consumeStatus.name());
-        }
-
-        setTopicInfo(info);
-        ConsumptionEntity entity = this.saveConsumption(info, operator);
-        String mqType = entity.getMqType();
-        if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-            savePulsarInfo(info.getMqExtInfo(), entity);
-        }
-
-        log.info("success to save consumption info by user={}", operator);
-        return entity.getId();
-    }
-
-    /**
-     * Save Pulsar consumption info
-     */
-    private void savePulsarInfo(ConsumptionMqExtBase mqExtBase, ConsumptionEntity entity) {
-        Preconditions.checkNotNull(mqExtBase, "Pulsar info cannot be empty, as the middleware is Pulsar");
-        // If it is transmitted from the web without specifying consumption pulsar info,
-        ConsumptionPulsarInfo pulsarInfo;
-        if (mqExtBase instanceof ConsumptionPulsarInfo) {
-            pulsarInfo = (ConsumptionPulsarInfo) mqExtBase;
-        } else {
-            pulsarInfo = new ConsumptionPulsarInfo();
-        }
-
-        // Prerequisite for RLQ to be turned on: DLQ must be turned on
-        boolean dlqEnable = (pulsarInfo.getIsDlq() != null && pulsarInfo.getIsDlq() == 1);
-        boolean rlqEnable = (pulsarInfo.getIsRlq() != null && pulsarInfo.getIsRlq() == 1);
-        if (rlqEnable && !dlqEnable) {
-            throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_RLQ_ERROR);
-        }
-
-        // When saving, the DLQ / RLQ under the same groupId cannot be repeated;
-        // when closing, delete the related configuration
-        String groupId = entity.getInlongGroupId();
-        if (dlqEnable) {
-            String dlqTopic = PREFIX_DLQ + "_" + pulsarInfo.getDeadLetterTopic();
-            Boolean exist = streamService.exist(groupId, dlqTopic);
-            if (exist) {
-                throw new BusinessException(ErrorCodeEnum.PULSAR_DLQ_DUPLICATED);
-            }
-        } else {
-            pulsarInfo.setIsDlq(0);
-            pulsarInfo.setDeadLetterTopic(null);
-        }
-        if (rlqEnable) {
-            String rlqTopic = PREFIX_RLQ + "_" + pulsarInfo.getRetryLetterTopic();
-            Boolean exist = streamService.exist(groupId, rlqTopic);
-            if (exist) {
-                throw new BusinessException(ErrorCodeEnum.PULSAR_RLQ_DUPLICATED);
-            }
-        } else {
-            pulsarInfo.setIsRlq(0);
-            pulsarInfo.setRetryLetterTopic(null);
-        }
-
-        ConsumptionPulsarEntity pulsar = CommonBeanUtils.copyProperties(pulsarInfo, ConsumptionPulsarEntity::new);
-        Integer consumptionId = entity.getId();
-        pulsar.setConsumptionId(consumptionId);
-        pulsar.setInlongGroupId(groupId);
-        pulsar.setConsumerGroup(entity.getConsumerGroup());
-        pulsar.setIsDeleted(0);
-
-        // Pulsar consumer information may already exist, update if it exists, add if it does not exist
-        ConsumptionPulsarEntity exists = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
-        if (exists == null) {
-            consumptionPulsarMapper.insert(pulsar);
-        } else {
-            pulsar.setId(exists.getId());
-            consumptionPulsarMapper.updateByPrimaryKey(pulsar);
-        }
-    }
-
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Boolean update(ConsumptionInfo info, String operator) {
-        Preconditions.checkNotNull(info, "consumption info cannot be null");
-        Integer consumptionId = info.getId();
-        Preconditions.checkNotNull(consumptionId, "consumption id cannot be null");
-
-        ConsumptionEntity exists = consumptionMapper.selectByPrimaryKey(consumptionId);
-        Preconditions.checkNotNull(exists, "consumption not exist with id " + consumptionId);
-        Preconditions.checkTrue(exists.getInCharges().contains(operator),
-                "operator" + operator + " has no privilege for the consumption");
-        String errMsg = String.format("consumption information has already updated, id=%s, curVersion=%s",
-                exists.getId(), info.getVersion());
-        if (!Objects.equals(exists.getVersion(), info.getVersion())) {
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        ConsumptionEntity entity = new ConsumptionEntity();
-        Date now = new Date();
-        CommonBeanUtils.copyProperties(info, entity, true);
-        entity.setModifier(operator);
-        entity.setModifyTime(now);
-        // Modify Pulsar consumption info
-        String mqType = info.getMqType();
-        if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-            ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
-            Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
-            pulsarEntity.setConsumerGroup(info.getConsumerGroup());
-
-            // Whether DLQ / RLQ is turned on or off
-            ConsumptionPulsarInfo update = (ConsumptionPulsarInfo) info.getMqExtInfo();
-            boolean dlqEnable = (update.getIsDlq() != null && update.getIsDlq() == 1);
-            boolean rlqEnable = (update.getIsRlq() != null && update.getIsRlq() == 1);
-
-            // DLQ is closed, RLQ cannot exist alone and must be closed
-            if (rlqEnable && !dlqEnable) {
-                throw new BusinessException(ErrorCodeEnum.PULSAR_TOPIC_CREATE_FAILED);
-            }
-
-            // If the consumption has been approved, then close/open DLQ or RLQ, it is necessary to
-            // add/remove inlong streams in the inlong group
-            if (ConsumeStatus.APPROVE_PASSED.getCode() == exists.getStatus()) {
-                String groupId = info.getInlongGroupId();
-                String dlqNameOld = pulsarEntity.getDeadLetterTopic();
-                String dlqNameNew = update.getDeadLetterTopic();
-                if (!dlqEnable) {
-                    pulsarEntity.setIsDlq(0);
-                    pulsarEntity.setDeadLetterTopic(null);
-                    streamService.logicDeleteDlqOrRlq(groupId, dlqNameOld, operator);
-                } else if (!Objects.equals(dlqNameNew, dlqNameOld)) {
-                    pulsarEntity.setIsDlq(1);
-                    String topic = PREFIX_DLQ + "_" + dlqNameNew;
-                    topic = topic.toLowerCase(Locale.ROOT);
-                    pulsarEntity.setDeadLetterTopic(topic);
-                    streamService.insertDlqOrRlq(groupId, topic, operator);
-                }
-
-                String rlqNameOld = pulsarEntity.getRetryLetterTopic();
-                String rlqNameNew = update.getRetryLetterTopic();
-                if (!rlqEnable) {
-                    pulsarEntity.setIsRlq(0);
-                    pulsarEntity.setRetryLetterTopic(null);
-                    streamService.logicDeleteDlqOrRlq(groupId, rlqNameOld, operator);
-                } else if (!Objects.equals(rlqNameNew, pulsarEntity.getRetryLetterTopic())) {
-                    pulsarEntity.setIsRlq(1);
-                    String topic = PREFIX_RLQ + "_" + rlqNameNew;
-                    topic = topic.toLowerCase(Locale.ROOT);
-                    pulsarEntity.setRetryLetterTopic(topic);
-                    streamService.insertDlqOrRlq(groupId, topic, operator);
-                }
-            }
-            consumptionPulsarMapper.updateByConsumptionId(pulsarEntity);
-        }
-
-        int rowCount = consumptionMapper.updateByPrimaryKeySelective(entity);
-        if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-            LOGGER.error(errMsg);
-            throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-        }
-        return true;
-    }
-
-    @Override
-    @Transactional(rollbackFor = Throwable.class)
-    public Boolean delete(Integer id, String operator) {
-        ConsumptionEntity consumptionEntity = consumptionMapper.selectByPrimaryKey(id);
-        Preconditions.checkNotNull(consumptionEntity, "consumption not exist with id: " + id);
-        consumptionMapper.deleteByPrimaryKey(id);
-
-        consumptionPulsarMapper.deleteByConsumptionId(id);
-        return true;
-    }
-
-    private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
-        ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
-        entity.setStatus(ConsumeStatus.TO_BE_SUBMIT.getCode());
-        entity.setCreator(operator);
-        entity.setModifier(operator);
-
-        if (info.getId() != null) {
-            int rowCount = consumptionMapper.updateByPrimaryKey(entity);
-            if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
-                LOGGER.error("consumption information has already updated, id={}, curVersion={}",
-                        entity.getId(), entity.getVersion());
-                throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
-            }
-        } else {
-            consumptionMapper.insert(entity);
-        }
-
-        Preconditions.checkNotNull(entity.getId(), "save consumption failed");
-        return entity;
-    }
-
-    /**
-     * Set the topic for the consumption information
-     */
-    private void setTopicInfo(ConsumptionInfo info) {
-        // Determine whether the consumed topic belongs to this groupId or the inlong stream under it
-        String groupId = info.getInlongGroupId();
-        InlongGroupTopicInfo topicVO = groupService.getTopic(groupId);
-        Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId);
-
-        // Tube’s topic is the inlong group level, one inlong group, one TubeMQ topic
-        String mqType = topicVO.getMqType();
-        // this class was deprecated, just comment it out
-        if (MQType.TUBEMQ.equals(mqType)) {
-            String mqResource = /*topicVO.getMqResource()*/ null;
-            Preconditions.checkTrue(mqResource == null || mqResource.equals(info.getTopic()),
-                    "topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
-        } else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-            // Pulsar's topic is the inlong stream level.
-            // There will be multiple inlong streams under one inlong group, and there will be multiple topics
-            List<InlongStreamBriefInfo> streamTopics = /*topicVO.getStreamTopics()*/ new ArrayList<>();
-            if (streamTopics != null && streamTopics.size() > 0) {
-                Set<String> topicSet = new HashSet<>(Arrays.asList(info.getTopic().split(",")));
-                streamTopics.forEach(stream -> topicSet.remove(stream.getMqResource()));
-                Preconditions.checkEmpty(topicSet, "topic [" + topicSet + "] not belong to inlong group " + groupId);
-            }
-            InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
-            if (null != inlongGroupEntity) {
-                PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
-                        inlongGroupEntity.getInlongClusterTag(), null, ClusterType.PULSAR);
-                String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
-                        ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
-                info.setTopic(String.format(InlongConstants.PULSAR_TOPIC_FORMAT, tenant,
-                        inlongGroupEntity.getMqResource(), info.getTopic()));
-            }
-
-        }
-        info.setMqType(mqType);
-    }
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index 908e61220..e34f024d6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -18,7 +18,9 @@
 package org.apache.inlong.manager.service.group;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
@@ -26,6 +28,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupExtEntity;
 import org.apache.inlong.manager.dao.entity.InlongStreamExtEntity;
+import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
@@ -110,7 +113,13 @@ public class InlongGroupOperator4Pulsar extends AbstractGroupOperator {
 
     @Override
     public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
+                groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
+        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
+                ? InlongConstants.DEFAULT_PULSAR_TENANT : pulsarCluster.getTenant();
+
         InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
+        topicInfo.setTenant(tenant);
         topicInfo.setNamespace(groupInfo.getMqResource());
         // each inlong stream is associated with a Pulsar topic
         List<String> topics = streamService.getTopicList(groupInfo.getInlongGroupId()).stream()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
similarity index 98%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
index db2f08ba5..9f3819e49 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManager.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatManager.java
@@ -15,9 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core.heartbeat;
+package org.apache.inlong.manager.service.heartbeat;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
@@ -54,7 +53,6 @@ import java.util.concurrent.TimeUnit;
 public class HeartbeatManager implements AbstractHeartbeatManager {
 
     private static final String AUTO_REGISTERED = "auto registered";
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
 
     @Getter
     private Cache<ComponentHeartbeat, HeartbeatMsg> heartbeatCache;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
similarity index 99%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
index 0d789a30d..af644a057 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/HeartbeatServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/heartbeat/HeartbeatServiceImpl.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.service.core.impl;
+package org.apache.inlong.manager.service.heartbeat;
 
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
@@ -44,7 +44,6 @@ import org.apache.inlong.manager.pojo.heartbeat.HeartbeatQueryRequest;
 import org.apache.inlong.manager.pojo.heartbeat.HeartbeatReportRequest;
 import org.apache.inlong.manager.pojo.heartbeat.StreamHeartbeatResponse;
 import org.apache.inlong.manager.service.core.HeartbeatService;
-import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index 07d9ed0b3..47ec39653 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -102,10 +102,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
     }
 
     /**
-     * Update consumption after approve
+     * Update consume info after approval
      */
-    private void updateConsumerInfo(Integer consumptionId, String consumerGroup) {
-        InlongConsumeEntity existEntity = consumeMapper.selectById(consumptionId);
+    private void updateConsumerInfo(Integer consumeId, String consumerGroup) {
+        InlongConsumeEntity existEntity = consumeMapper.selectById(consumeId);
         existEntity.setStatus(ConsumeStatus.APPROVE_PASSED.getCode());
         existEntity.setConsumerGroup(consumerGroup);
         int rowCount = consumeMapper.updateByIdSelective(existEntity);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index f66b81955..5b698b856 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -252,7 +252,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
                 pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs);
                 log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
 
-                // insert the consumer group info into the consumption table
+                // insert the consumer group info into the inlong_consume table
                 Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
                 log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
                         id, subs, groupId, topicName);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index fffd83506..6f011875c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -36,7 +36,7 @@ import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.heartbeat.HeartbeatManager;
+import org.apache.inlong.manager.service.heartbeat.HeartbeatManager;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
index 0cacaabb4..dad85fc21 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/heartbeat/HeartbeatManagerTest.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongClusterNodeEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterNodeRequest;
 import org.apache.inlong.manager.service.ServiceBaseTest;
+import org.apache.inlong.manager.service.heartbeat.HeartbeatManager;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
deleted file mode 100644
index d7d824b6d..000000000
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.core.impl;
-
-import org.apache.inlong.manager.common.consts.MQType;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
-import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.group.InlongGroupServiceTest;
-import org.junit.jupiter.api.Assertions;
-import org.springframework.beans.factory.annotation.Autowired;
-
-/**
- * Consumption service test
- */
-public class ConsumptionServiceTest extends ServiceBaseTest {
-
-    String inlongGroupId = "group_for_consumption_test";
-    String consumerGroup = "test_consumer_group";
-    String operator = "admin";
-
-    @Autowired
-    private ConsumptionService consumptionService;
-    @Autowired
-    private InlongGroupServiceTest groupServiceTest;
-
-    private Integer saveConsumption(String inlongGroupId, String consumerGroup, String operator) {
-        ConsumptionInfo consumptionInfo = new ConsumptionInfo();
-        consumptionInfo.setTopic(inlongGroupId);
-        consumptionInfo.setConsumerGroup(consumerGroup);
-        consumptionInfo.setInlongGroupId(inlongGroupId);
-        consumptionInfo.setMqType(MQType.PULSAR);
-        consumptionInfo.setCreator(operator);
-        consumptionInfo.setInCharges("admin");
-
-        ConsumptionPulsarInfo pulsarInfo = new ConsumptionPulsarInfo();
-        pulsarInfo.setMqType(MQType.PULSAR);
-        pulsarInfo.setIsDlq(1);
-        pulsarInfo.setDeadLetterTopic("test_dlq");
-        pulsarInfo.setIsRlq(0);
-
-        consumptionInfo.setMqExtInfo(pulsarInfo);
-
-        return consumptionService.save(consumptionInfo, operator);
-    }
-
-    // Online test will be BusinessException: Inlong group does not exist/no operation authority
-    // @Test
-    public void testSaveAndDelete() {
-        groupServiceTest.saveGroup(inlongGroupId, operator);
-        Integer id = this.saveConsumption(inlongGroupId, consumerGroup, operator);
-        Assertions.assertNotNull(id);
-        boolean result = consumptionService.delete(id, operator);
-        Assertions.assertTrue(result);
-    }
-}
diff --git a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
index 5fe7e6076..951d854c9 100644
--- a/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
+++ b/inlong-manager/manager-test/src/main/resources/h2/apache_inlong_manager.sql
@@ -130,7 +130,7 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
     `type`          varchar(20)  NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
     `ip`            varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
     `port`          int(6)       NULL COMMENT 'Cluster port',
-    `protocol_type` varchar(20)  DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+    `protocol_type` varchar(20)           DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
     `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
     `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
     `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
@@ -197,45 +197,6 @@ CREATE TABLE IF NOT EXISTS `data_node`
     UNIQUE KEY `unique_data_node` (`name`, `type`, `is_deleted`)
 );
 
--- ----------------------------
--- Deprecated: Table structure for consumption
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption`
-(
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `consumer_group`   varchar(256) NOT NULL COMMENT 'Consumer group',
-    `in_charges`       varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `mq_type`          varchar(10)           DEFAULT 'TUBE' COMMENT 'Message queue type, high throughput: TUBE, high consistency: PULSAR',
-    `topic`            varchar(256) NOT NULL COMMENT 'Consumption topic',
-    `filter_enabled`   int(2)                DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
-    `inlong_stream_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong stream ID for consumption, if filter_enable is 1, it cannot empty',
-    `status`           int(4)       NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
-    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`          varchar(64)  NOT NULL COMMENT 'creator',
-    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'modifier',
-    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `version`          int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
-    PRIMARY KEY (`id`)
-);
-
--- ----------------------------
--- Deprecated: Table structure for consumption_pulsar
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption_pulsar`
-(
-    `id`                 int(11)      NOT NULL AUTO_INCREMENT,
-    `consumption_id`     int(11)      DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
-    `consumer_group`     varchar(256) NOT NULL COMMENT 'Consumer group',
-    `inlong_group_id`    varchar(256) NOT NULL COMMENT 'Inlong group ID',
-    `is_rlq`             tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
-    `retry_letter_topic` varchar(256) DEFAULT NULL COMMENT 'The name of the retry queue topic',
-    `is_dlq`             tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
-    `dead_letter_topic`  varchar(256) DEFAULT NULL COMMENT 'dead letter topic name',
-    `is_deleted`         int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    PRIMARY KEY (`id`)
-);
 -- ----------------------------
 -- Table structure for stream_source_cmd_config
 -- ----------------------------
diff --git a/inlong-manager/manager-web/sql/apache_inlong_manager.sql b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
index 6a044065e..fbd73961f 100644
--- a/inlong-manager/manager-web/sql/apache_inlong_manager.sql
+++ b/inlong-manager/manager-web/sql/apache_inlong_manager.sql
@@ -140,7 +140,7 @@ CREATE TABLE IF NOT EXISTS `inlong_cluster_node`
     `type`          varchar(20)  NOT NULL COMMENT 'Cluster type, such as: AGENT, DATAPROXY, etc',
     `ip`            varchar(512) NOT NULL COMMENT 'Cluster IP, separated by commas, such as: 127.0.0.1:8080,host2:8081',
     `port`          int(6)       NULL COMMENT 'Cluster port',
-    `protocol_type` varchar(20)  DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
+    `protocol_type` varchar(20)           DEFAULT NULL COMMENT 'DATAPROXY Source listen protocol type, such as: TCP/HTTP',
     `ext_params`    mediumtext            DEFAULT NULL COMMENT 'Another fields will be saved as JSON string',
     `description`   varchar(256)          DEFAULT '' COMMENT 'Description of cluster node',
     `status`        int(4)                DEFAULT '0' COMMENT 'Cluster status',
@@ -209,48 +209,6 @@ CREATE TABLE IF NOT EXISTS `data_node`
 ) ENGINE = InnoDB
   DEFAULT CHARSET = utf8mb4 COMMENT ='Data node table';
 
--- ----------------------------
--- Deprecated: Table structure for consumption
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption`
-(
-    `id`               int(11)      NOT NULL AUTO_INCREMENT COMMENT 'Incremental primary key',
-    `consumer_group`   varchar(256) NOT NULL COMMENT 'Consumer group',
-    `in_charges`       varchar(512) NOT NULL COMMENT 'Person in charge of consumption',
-    `inlong_group_id`  varchar(256) NOT NULL COMMENT 'Inlong group id',
-    `mq_type`          varchar(10)           DEFAULT 'TUBE' COMMENT 'Message queue type, high throughput: TUBE, high consistency: PULSAR',
-    `topic`            varchar(256) NOT NULL COMMENT 'Consumption topic',
-    `filter_enabled`   int(2)                DEFAULT '0' COMMENT 'Whether to filter, default 0, not filter consume',
-    `inlong_stream_id` varchar(256)          DEFAULT NULL COMMENT 'Inlong stream ID for consumption, if filter_enable is 1, it cannot empty',
-    `status`           int(4)       NOT NULL COMMENT 'Status: draft, pending approval, approval rejected, approval passed',
-    `is_deleted`       int(11)               DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    `creator`          varchar(64)  NOT NULL COMMENT 'creator',
-    `modifier`         varchar(64)           DEFAULT NULL COMMENT 'modifier',
-    `create_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Create time',
-    `modify_time`      timestamp    NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Modify time',
-    `version`          int(11)      NOT NULL DEFAULT '1' COMMENT 'Version number, which will be incremented by 1 after modification',
-    PRIMARY KEY (`id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='Data consumption configuration table';
-
--- ----------------------------
--- Deprecated: Table structure for consumption_pulsar
--- ----------------------------
-CREATE TABLE IF NOT EXISTS `consumption_pulsar`
-(
-    `id`                 int(11)      NOT NULL AUTO_INCREMENT,
-    `consumption_id`     int(11)      DEFAULT NULL COMMENT 'ID of the consumption information to which it belongs, guaranteed to be uniquely associated with consumption information',
-    `consumer_group`     varchar(256) NOT NULL COMMENT 'Consumer group',
-    `inlong_group_id`    varchar(256) NOT NULL COMMENT 'Inlong group ID',
-    `is_rlq`             tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure the retry letter topic, 0: no configuration, 1: configuration',
-    `retry_letter_topic` varchar(256) DEFAULT NULL COMMENT 'The name of the retry queue topic',
-    `is_dlq`             tinyint(1)   DEFAULT '0' COMMENT 'Whether to configure dead letter topic, 0: no configuration, 1: means configuration',
-    `dead_letter_topic`  varchar(256) DEFAULT NULL COMMENT 'dead letter topic name',
-    `is_deleted`         int(11)      DEFAULT '0' COMMENT 'Whether to delete, 0: not deleted, > 0: deleted',
-    PRIMARY KEY (`id`)
-) ENGINE = InnoDB
-  DEFAULT CHARSET = utf8mb4 COMMENT ='Pulsar consumption table';
-
 -- ----------------------------
 -- Table structure for stream_source_cmd_config
 -- ----------------------------
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
deleted file mode 100644
index f1c77ad51..000000000
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/ConsumptionController.java
+++ /dev/null
@@ -1,115 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.web.controller;
-
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiImplicitParam;
-import io.swagger.annotations.ApiOperation;
-import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.pojo.common.PageResult;
-import org.apache.inlong.manager.pojo.common.Response;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
-import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.workflow.WorkflowResult;
-import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.consume.InlongConsumeProcessService;
-import org.apache.inlong.manager.service.operationlog.OperationLog;
-import org.apache.inlong.manager.service.user.LoginUserUtils;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.validation.annotation.Validated;
-import org.springframework.web.bind.annotation.DeleteMapping;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-
-/**
- * Data consumption interface
- */
-@RestController
-@RequestMapping("/api")
-@Api(tags = "Consumption-API")
-public class ConsumptionController {
-
-    @Autowired
-    private ConsumptionService consumptionService;
-    @Autowired
-    private InlongConsumeProcessService processOperation;
-
-    @GetMapping("/consumption/summary")
-    @ApiOperation(value = "Get data consumption summary")
-    public Response<ConsumptionSummary> getSummary(ConsumptionQuery query) {
-        query.setUsername(LoginUserUtils.getLoginUser().getName());
-        return Response.success(consumptionService.getSummary(query));
-    }
-
-    @GetMapping("/consumption/list")
-    @ApiOperation(value = "List data consumptions")
-    public Response<PageResult<ConsumptionListVo>> list(ConsumptionQuery query) {
-        query.setUsername(LoginUserUtils.getLoginUser().getName());
-        return Response.success(consumptionService.list(query));
-    }
-
-    @GetMapping("/consumption/get/{id}")
-    @ApiOperation(value = "Get consumption details")
-    @ApiImplicitParam(name = "id", value = "Consumption ID", dataTypeClass = Integer.class, required = true)
-    public Response<ConsumptionInfo> getDetail(@PathVariable(name = "id") Integer id) {
-        return Response.success(consumptionService.get(id));
-    }
-
-    @DeleteMapping("/consumption/delete/{id}")
-    @OperationLog(operation = OperationType.DELETE)
-    @ApiOperation(value = "Delete data consumption")
-    @ApiImplicitParam(name = "id", value = "Consumption ID", dataTypeClass = Integer.class, required = true)
-    public Response<Object> delete(@PathVariable(name = "id") Integer id) {
-        this.consumptionService.delete(id, LoginUserUtils.getLoginUser().getName());
-        return Response.success();
-    }
-
-    @PostMapping("/consumption/save")
-    @OperationLog(operation = OperationType.UPDATE)
-    @ApiOperation(value = "Save data consumption", notes = "Full coverage")
-    public Response<Integer> save(@Validated @RequestBody ConsumptionInfo consumptionInfo) {
-        String currentUser = LoginUserUtils.getLoginUser().getName();
-        return Response.success(consumptionService.save(consumptionInfo, currentUser));
-    }
-
-    @PostMapping("/consumption/update/{id}")
-    @OperationLog(operation = OperationType.UPDATE)
-    @ApiOperation(value = "Update data consumption")
-    public Response<String> update(@PathVariable(name = "id") Integer id,
-            @Validated @RequestBody ConsumptionInfo consumptionInfo) {
-        consumptionInfo.setId(id);
-        consumptionService.update(consumptionInfo, LoginUserUtils.getLoginUser().getName());
-        return Response.success();
-    }
-
-    @PostMapping("/consumption/startProcess/{id}")
-    @OperationLog(operation = OperationType.UPDATE)
-    @ApiOperation(value = "Start approval process")
-    @ApiImplicitParam(name = "id", value = "Consumption ID", dataTypeClass = Integer.class, required = true)
-    public Response<WorkflowResult> startProcess(@PathVariable(name = "id") Integer id) {
-        String username = LoginUserUtils.getLoginUser().getName();
-        return Response.success(processOperation.startProcess(id, username));
-    }
-
-}
diff --git a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowConfig.java b/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowConfig.java
deleted file mode 100644
index f86d9f0d1..000000000
--- a/inlong-manager/manager-workflow/src/main/java/org/apache/inlong/manager/workflow/WorkflowConfig.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.workflow;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import org.apache.inlong.manager.dao.mapper.WorkflowEventLogEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowProcessEntityMapper;
-import org.apache.inlong.manager.dao.mapper.WorkflowTaskEntityMapper;
-import org.apache.inlong.manager.workflow.core.ProcessDefinitionRepository;
-import org.apache.inlong.manager.workflow.core.WorkflowQueryService;
-import org.springframework.transaction.PlatformTransactionManager;
-
-/**
- * Workflow config
- */
-public class WorkflowConfig {
-
-    @JsonIgnore
-    private WorkflowQueryService queryService;
-    @JsonIgnore
-    private WorkflowProcessEntityMapper processEntityMapper;
-    @JsonIgnore
-    private WorkflowTaskEntityMapper taskEntityMapper;
-    @JsonIgnore
-    private WorkflowEventLogEntityMapper eventLogMapper;
-    @JsonIgnore
-    private ProcessDefinitionRepository definitionRepository;
-    @JsonIgnore
-    private PlatformTransactionManager transactionManager;
-
-    public WorkflowQueryService getQueryService() {
-        return queryService;
-    }
-
-    public WorkflowConfig setQueryService(WorkflowQueryService queryService) {
-        this.queryService = queryService;
-        return this;
-    }
-
-    public WorkflowProcessEntityMapper getProcessEntityMapper() {
-        return processEntityMapper;
-    }
-
-    public WorkflowConfig setProcessEntityMapper(WorkflowProcessEntityMapper processEntityMapper) {
-        this.processEntityMapper = processEntityMapper;
-        return this;
-    }
-
-    public WorkflowTaskEntityMapper getTaskEntityMapper() {
-        return taskEntityMapper;
-    }
-
-    public WorkflowConfig setTaskEntityMapper(WorkflowTaskEntityMapper taskEntityMapper) {
-        this.taskEntityMapper = taskEntityMapper;
-        return this;
-    }
-
-    public WorkflowEventLogEntityMapper getEventLogMapper() {
-        return eventLogMapper;
-    }
-
-    public WorkflowConfig setEventLogMapper(WorkflowEventLogEntityMapper eventLogMapper) {
-        this.eventLogMapper = eventLogMapper;
-        return this;
-    }
-
-    public ProcessDefinitionRepository getDefinitionRepository() {
-        return definitionRepository;
-    }
-
-    public WorkflowConfig setDefinitionRepository(ProcessDefinitionRepository definitionRepository) {
-        this.definitionRepository = definitionRepository;
-        return this;
-    }
-
-    public PlatformTransactionManager getTransactionManager() {
-        return transactionManager;
-    }
-
-    public WorkflowConfig setTransactionManager(PlatformTransactionManager transactionManager) {
-        this.transactionManager = transactionManager;
-        return this;
-    }
-
-}