You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/06 03:42:19 UTC

[inlong] branch master updated: [INLONG-5781][Manager] Support creating consumer groups for all sinks (#5782)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9844199f6 [INLONG-5781][Manager] Support creating consumer groups for all sinks (#5782)
9844199f6 is described below

commit 9844199f601582438daa470751a2beb37bd7f512
Author: healchow <he...@gmail.com>
AuthorDate: Tue Sep 6 11:42:13 2022 +0800

    [INLONG-5781][Manager] Support creating consumer groups for all sinks (#5782)
---
 .../dao/mapper/InlongConsumeEntityMapper.java      |   4 +
 .../mappers/InlongConsumeEntityMapper.xml          |  10 ++
 .../{PulsarTopicBean.java => PulsarTopicInfo.java} |   2 +-
 .../service/consume/InlongConsumeService.java      |   8 ++
 .../service/consume/InlongConsumeServiceImpl.java  |  32 ++++++
 .../manager/service/core/ConsumptionService.java   |   6 --
 .../service/core/impl/ConsumptionServiceImpl.java  |  40 --------
 .../ConsumptionCompleteProcessListener.java        |  12 +--
 .../resource/queue/kafka/KafkaOperator.java        |   2 -
 .../queue/kafka/KafkaResourceOperators.java        |  37 ++++---
 .../resource/queue/pulsar/PulsarOperator.java      | 110 ++++++++++-----------
 .../queue/pulsar/PulsarResourceOperator.java       | 103 ++++++++++++-------
 .../queue/tubemq/TubeMQResourceOperator.java       |  13 +--
 13 files changed, 212 insertions(+), 167 deletions(-)

diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 3b0205048..feaf28a50 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -36,6 +36,10 @@ public interface InlongConsumeEntityMapper {
 
     List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
 
+    InlongConsumeEntity selectExists(@Param("groupId") String groupId,
+            @Param("topic") String topic,
+            @Param("consumerGroup") String consumerGroup);
+
     int updateById(InlongConsumeEntity record);
 
     int updateByIdSelective(InlongConsumeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
index 1db846b23..16e1d9b52 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -118,6 +118,16 @@
             </otherwise>
         </choose>
     </select>
+    <select id="selectExists" resultType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+        select
+        <include refid="Base_Column_List"/>
+        from inlong_consume
+        where is_deleted = 0
+        and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+        and topic = #{topic, jdbcType=VARCHAR}
+        and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+        limit 1
+    </select>
 
     <update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
         update inlong_consume
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicBean.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
similarity index 97%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicBean.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
index 91dc7937d..29a8f4efb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicBean.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
@@ -29,7 +29,7 @@ import lombok.NoArgsConstructor;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class PulsarTopicBean {
+public class PulsarTopicInfo {
 
     private String tenant;
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
index ce421e337..c31fab822 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 
 import javax.validation.Valid;
 import javax.validation.constraints.NotNull;
@@ -41,6 +42,13 @@ public interface InlongConsumeService {
      */
     Integer save(InlongConsumeRequest request, String operator);
 
+    /**
+     * Save the consumer group by InLong system, and not start the workflow process.
+     *
+     * @return inlong consume id after saving
+     */
+    Integer saveBySystem(InlongGroupInfo groupInfo, String topic, String consumerGroup);
+
     /**
      * Get inlong consume info based on ID
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index c7977b500..f0c5cec8f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
 import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
 import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -58,6 +59,7 @@ import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
 public class InlongConsumeServiceImpl implements InlongConsumeService {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class);
+    private static final String AUTO_CREATE_MSG = "auto create by inlong";
 
     @Autowired
     private InlongConsumeEntityMapper consumeMapper;
@@ -82,6 +84,36 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
         return request.getId();
     }
 
+    @Override
+    public Integer saveBySystem(InlongGroupInfo groupInfo, String topic, String consumerGroup) {
+        String groupId = groupInfo.getInlongGroupId();
+        InlongConsumeEntity existEntity = consumeMapper.selectExists(groupId, topic, consumerGroup);
+        if (existEntity != null) {
+            LOGGER.warn("inlong consume already exists for groupId={} topic={} consumerGroup={}, skip to create",
+                    groupId, topic, consumerGroup);
+            return existEntity.getId();
+        }
+
+        LOGGER.debug("begin to save inlong consume for groupId={} topic={} group={}", groupId, topic, consumerGroup);
+        InlongConsumeEntity entity = new InlongConsumeEntity();
+        entity.setConsumerGroup(consumerGroup);
+        entity.setDescription(AUTO_CREATE_MSG);
+        entity.setMqType(groupInfo.getMqType());
+        entity.setTopic(topic);
+        entity.setInlongGroupId(groupId);
+        entity.setFilterEnabled(0);
+
+        entity.setInCharges(groupInfo.getInCharges());
+        entity.setStatus(ConsumeStatus.APPROVED.getCode());
+        String operator = groupInfo.getCreator();
+        entity.setCreator(operator);
+        entity.setModifier(operator);
+
+        consumeMapper.insert(entity);
+        LOGGER.debug("success save inlong consume for groupId={} topic={} group={}", groupId, topic, consumerGroup);
+        return entity.getId();
+    }
+
     @Override
     public boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId) {
         InlongConsumePageRequest request = InlongConsumePageRequest.builder()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
index 489c8d1c0..5a181fd97 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
@@ -22,7 +22,6 @@ import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 
 /**
  * Data consumption interface
@@ -87,9 +86,4 @@ public interface ConsumptionService {
      */
     Boolean delete(Integer id, String operator);
 
-    /**
-     * Save the consumer group info for Sort to the database
-     */
-    void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String consumerGroup);
-
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 5641a293c..0de16b5c6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -43,7 +43,6 @@ import org.apache.inlong.manager.pojo.consumption.ConsumptionMqExtBase;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
 import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -336,45 +335,6 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         return true;
     }
 
-    @Override
-    public void saveSortConsumption(InlongGroupInfo groupInfo, String topic, String consumerGroup) {
-        String groupId = groupInfo.getInlongGroupId();
-        ConsumptionEntity exists = consumptionMapper.selectConsumptionExists(groupId, topic, consumerGroup);
-        if (exists != null) {
-            log.warn("consumption with groupId={}, topic={}, consumer group={} already exists, skip to create",
-                    groupId, topic, consumerGroup);
-            return;
-        }
-
-        log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
-        String mqType = groupInfo.getMqType();
-        ConsumptionEntity entity = new ConsumptionEntity();
-        entity.setInlongGroupId(groupId);
-        entity.setMqType(mqType);
-        entity.setTopic(topic);
-        entity.setConsumerGroup(consumerGroup);
-        entity.setInCharges(groupInfo.getInCharges());
-        entity.setFilterEnabled(0);
-
-        entity.setStatus(ConsumeStatus.APPROVED.getCode());
-        String operator = groupInfo.getCreator();
-        entity.setCreator(operator);
-        entity.setModifier(operator);
-
-        consumptionMapper.insert(entity);
-
-        if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-            ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
-            pulsarEntity.setConsumptionId(entity.getId());
-            pulsarEntity.setConsumerGroup(consumerGroup);
-            pulsarEntity.setInlongGroupId(groupId);
-            pulsarEntity.setIsDeleted(InlongConstants.UN_DELETED);
-            consumptionPulsarMapper.insert(pulsarEntity);
-        }
-
-        log.debug("success save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
-    }
-
     private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
         ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
         entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
index 3b7ac58ad..7f5478445 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
@@ -20,10 +20,10 @@ package org.apache.inlong.manager.service.listener.consumption;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
-import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
 import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
@@ -131,7 +131,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
         ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            PulsarTopicBean topicMessage = new PulsarTopicBean();
+            PulsarTopicInfo topicMessage = new PulsarTopicInfo();
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
                 tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
@@ -140,7 +140,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
             topicMessage.setNamespace(mqResource);
 
             String consumerGroup = entity.getConsumerGroup();
-            List<String> topics = Arrays.asList(entity.getTopic().split(","));
+            List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
             this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
         } catch (Exception e) {
             log.error("create pulsar topic failed", e);
@@ -149,10 +149,10 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
         }
     }
 
-    private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+    private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
             List<String> topics) {
         try {
-            pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
+            pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicInfo, topics);
         } catch (Exception e) {
             log.error("create pulsar consumer group failed", e);
             throw new WorkflowListenerException("failed to create pulsar consumer group");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 3d3a7e063..7e886cf95 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -79,9 +79,7 @@ public class KafkaOperator {
 
     public void createSubscription(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
             String subscription) {
-
         KafkaConsumer kafkaConsumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo);
-        // subscription
         kafkaConsumer.subscribe(Collections.singletonList(subscription));
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 7f06633bd..37a1ee66d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -17,9 +17,6 @@
 
 package org.apache.inlong.manager.service.resource.queue.kafka;
 
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
@@ -32,12 +29,14 @@ import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
 import java.util.List;
 
 /**
@@ -47,6 +46,11 @@ import java.util.List;
 @Service
 public class KafkaResourceOperators implements QueueResourceOperator {
 
+    /**
+     * The name rule for Kafka consumer group: clusterTag_topicName_consumer_group
+     */
+    private static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
+
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
@@ -54,7 +58,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     @Autowired
     private KafkaOperator kafkaOperator;
     @Autowired
-    private ConsumptionService consumptionService;
+    private InlongConsumeService consumeService;
 
     @Override
     public boolean accept(String mqType) {
@@ -155,12 +159,11 @@ public class KafkaResourceOperators implements QueueResourceOperator {
     /**
      * Create Kafka Topic and Subscription, and save the consumer group info.
      */
-    private void createKafkaTopic(InlongKafkaInfo inlongKafkaInfo, String streamId)
-            throws Exception {
+    private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String streamId) throws Exception {
         // 1. create kafka topic
-        ClusterInfo clusterInfo = clusterService.getOne(inlongKafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
-        String topicName = inlongKafkaInfo.getInlongGroupId() + "_" + streamId;
-        kafkaOperator.createTopic(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+        ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+        String topicName = kafkaInfo.getInlongGroupId() + "_" + streamId;
+        kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
 
         boolean exist = kafkaOperator.topicIsExists((KafkaClusterInfo) clusterInfo, topicName);
         if (!exist) {
@@ -170,14 +173,16 @@ public class KafkaResourceOperators implements QueueResourceOperator {
         }
 
         // 2. create a subscription for the kafka topic
-        kafkaOperator.createSubscription(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
-        String groupId = inlongKafkaInfo.getInlongGroupId();
-        log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}",
+        kafkaOperator.createSubscription(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+        String groupId = kafkaInfo.getInlongGroupId();
+        log.info("success to create kafka subscription for groupId={}, topic={}, consumeGroup={}",
                 groupId, topicName, topicName);
 
-        // 3. insert the consumer group info into the consumption table
-        consumptionService.saveSortConsumption(inlongKafkaInfo, topicName, topicName);
-        log.info("success to save consume for groupId={}, topic={}, subs={}", groupId, topicName, topicName);
+        // 3. insert the consumer group info
+        String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName);
+        Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup);
+        log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
+                id, consumeGroup, groupId, topicName);
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 9bfe2605b..3ebeeca37 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.conversion.ConversionHandle;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -92,7 +92,7 @@ public class PulsarOperator {
         LOGGER.info("begin to create namespace={}", namespaceName);
         try {
             // Check whether the namespace exists, and create it if it does not exist
-            boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
+            boolean isExists = this.namespaceExists(pulsarAdmin, tenant, namespace);
             if (isExists) {
                 LOGGER.warn("namespace={} already exists, skip to create", namespaceName);
                 return;
@@ -138,41 +138,41 @@ public class PulsarOperator {
     /**
      * Create Pulsar topic
      */
-    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
-        Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
-        String tenant = topicBean.getTenant();
-        String namespace = topicBean.getNamespace();
-        String topic = topicBean.getTopicName();
-        String topicFullName = tenant + "/" + namespace + "/" + topic;
+    public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+        Preconditions.checkNotNull(topicInfo, "pulsar topic info cannot be empty");
+        String tenant = topicInfo.getTenant();
+        String namespace = topicInfo.getNamespace();
+        String topicName = topicInfo.getTopicName();
+        String fullTopicName = tenant + "/" + namespace + "/" + topicName;
 
         // Topic will be returned if it exists, and created if it does not exist
-        if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
-                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
-            LOGGER.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
+        if (topicExists(pulsarAdmin, tenant, namespace, topicName,
+                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
+            LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarAdmin.getServiceUrl());
             return;
         }
 
         try {
-            if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
-                pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
-                String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
-                LOGGER.info("success to create topic={}, lookup result is {}", topicFullName, res);
+            if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicInfo.getQueueModule())) {
+                pulsarAdmin.topics().createNonPartitionedTopic(fullTopicName);
+                String res = pulsarAdmin.lookups().lookupTopic(fullTopicName);
+                LOGGER.info("success to create topic={}, lookup result is {}", fullTopicName, res);
             } else {
                 // The number of brokers as the default value of topic partition
                 List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
-                Integer numPartitions = topicBean.getNumPartitions();
+                Integer numPartitions = topicInfo.getNumPartitions();
                 if (numPartitions < 0 || numPartitions <= MAX_PARTITION) {
                     List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
                     numPartitions = brokers.size();
                 }
 
-                pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
-                Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+                pulsarAdmin.topics().createPartitionedTopic(fullTopicName, numPartitions);
+                Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
                 // if lookup failed (res.size not equals the partition number)
                 if (res.keySet().size() != numPartitions) {
                     // look up partition failed, retry to get partition numbers
                     for (int i = 0; (i < RETRY_TIMES && res.keySet().size() != numPartitions); i++) {
-                        res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+                        res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
                         try {
                             Thread.sleep(500);
                         } catch (InterruptedException e) {
@@ -183,10 +183,10 @@ public class PulsarOperator {
                 if (numPartitions != res.keySet().size()) {
                     throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
                 }
-                LOGGER.info("success to create topic={}", topicFullName);
+                LOGGER.info("success to create topic={}", fullTopicName);
             }
         } catch (PulsarAdminException e) {
-            LOGGER.error("failed to create topic=" + topicFullName, e);
+            LOGGER.error("failed to create topic=" + fullTopicName, e);
             throw e;
         }
     }
@@ -194,26 +194,26 @@ public class PulsarOperator {
     /**
      * Force delete Pulsar topic
      */
-    public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
-        Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
+    public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+        Preconditions.checkNotNull(topicInfo, "pulsar topic info cannot be empty");
 
-        String tenant = topicBean.getTenant();
-        String namespace = topicBean.getNamespace();
-        String topic = topicBean.getTopicName();
-        String topicFullName = tenant + "/" + namespace + "/" + topic;
+        String tenant = topicInfo.getTenant();
+        String namespace = topicInfo.getNamespace();
+        String topic = topicInfo.getTopicName();
+        String fullTopicName = tenant + "/" + namespace + "/" + topic;
 
         // Topic will be returned if it not exists
-        if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
-                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
-            LOGGER.warn("pulsar topic={} already delete", topicFullName);
+        if (topicExists(pulsarAdmin, tenant, namespace, topic,
+                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
+            LOGGER.warn("pulsar topic={} already delete", fullTopicName);
             return;
         }
 
         try {
-            pulsarAdmin.topics().delete(topicFullName, true);
-            LOGGER.info("success to delete topic={}", topicFullName);
+            pulsarAdmin.topics().delete(fullTopicName, true);
+            LOGGER.info("success to delete topic={}", fullTopicName);
         } catch (PulsarAdminException e) {
-            LOGGER.error("failed to delete topic=" + topicFullName, e);
+            LOGGER.error("failed to delete topic=" + fullTopicName, e);
             throw e;
         }
     }
@@ -221,21 +221,17 @@ public class PulsarOperator {
     /**
      * Create a Pulsar subscription for the given topic
      */
-    public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
-            throws PulsarAdminException {
-        Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
-        Preconditions.checkNotNull(subscription, "subscription cannot be empty during creating subscription");
-
-        String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
-        LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
+    public void createSubscription(PulsarAdmin pulsarAdmin, String fullTopicName, String queueModule,
+            String subscription) throws PulsarAdminException {
+        LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, fullTopicName);
         try {
-            boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription,
-                    InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
+            boolean isExists = this.subscriptionExists(pulsarAdmin, fullTopicName, subscription,
+                    InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(queueModule));
             if (isExists) {
                 LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
                 return;
             }
-            pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
+            pulsarAdmin.topics().createSubscription(fullTopicName, subscription, MessageId.latest);
             LOGGER.info("success to create subscription={}", subscription);
         } catch (PulsarAdminException e) {
             LOGGER.error("failed to create pulsar subscription=" + subscription, e);
@@ -246,11 +242,12 @@ public class PulsarOperator {
     /**
      * Create a Pulsar subscription for the specified topic list
      */
-    public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+    public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
             List<String> topicList) throws PulsarAdminException {
         for (String topic : topicList) {
-            topicBean.setTopicName(topic);
-            this.createSubscription(pulsarAdmin, topicBean, subscription);
+            topicInfo.setTopicName(topic);
+            String fullTopicName = topicInfo.getTenant() + "/" + topicInfo.getNamespace() + "/" + topic;
+            this.createSubscription(pulsarAdmin, fullTopicName, topicInfo.getQueueModule(), subscription);
         }
         LOGGER.info("success to create subscription={} for multiple topics={}", subscription, topicList);
     }
@@ -266,7 +263,7 @@ public class PulsarOperator {
     /**
      * Check whether the Pulsar namespace exists under the specified tenant
      */
-    private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
+    private boolean namespaceExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
             throws PulsarAdminException {
         List<String> namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
         return namespaceList.contains(tenant + "/" + namespace);
@@ -278,9 +275,9 @@ public class PulsarOperator {
      * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
      *         Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
      */
-    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic,
+    public boolean topicExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topicName,
             boolean isPartitioned) {
-        if (StringUtils.isBlank(topic)) {
+        if (StringUtils.isBlank(topicName)) {
             return true;
         }
 
@@ -295,36 +292,39 @@ public class PulsarOperator {
             }
             for (String t : topicList) {
                 t = t.substring(t.lastIndexOf("/") + 1); // not contains /
-                if (topic.equals(t)) {
+                if (topicName.equals(t)) {
                     topicExists = true;
                     break;
                 }
             }
         } catch (PulsarAdminException pe) {
-            LOGGER.error("check if the pulsar topic={} exists error, begin retry", topic, pe);
+            LOGGER.error("check if the pulsar topic={} exists error, begin retry", topicName, pe);
             int count = 0;
             try {
                 while (!topicExists && ++count <= RETRY_TIMES) {
-                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topic, count);
+                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topicName, count);
                     Thread.sleep(DELAY_SECONDS);
 
                     topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
                     for (String t : topicList) {
                         t = t.substring(t.lastIndexOf("/") + 1);
-                        if (topic.equals(t)) {
+                        if (topicName.equals(t)) {
                             topicExists = true;
                             break;
                         }
                     }
                 }
             } catch (Exception e) {
-                LOGGER.error("after retry, check if the pulsar topic={} exists still error", topic, pe);
+                LOGGER.error("after retry, check if the pulsar topic={} exists still error", topicName, pe);
             }
         }
         return topicExists;
     }
 
-    private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
+    /**
+     * Check whether the Pulsar topic exists.
+     */
+    private boolean subscriptionExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
             boolean isPartitioned) {
         int count = 0;
         while (++count <= RETRY_TIMES) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 1b179c597..f66b81955 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.resource.queue.pulsar;
 
 import com.google.common.base.Objects;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.MQType;
@@ -30,12 +31,14 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.stream.InlongStreamService;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -50,12 +53,19 @@ import java.util.List;
 @Service
 public class PulsarResourceOperator implements QueueResourceOperator {
 
+    /**
+     * The name rule for Pulsar subscription: clusterTag_topicName_sinkId_consumer_group
+     */
+    private static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
+
+    @Autowired
+    private InlongClusterService clusterService;
     @Autowired
     private InlongStreamService streamService;
     @Autowired
-    private InlongClusterService clusterService;
+    private StreamSinkService sinkService;
     @Autowired
-    private ConsumptionService consumptionService;
+    private InlongConsumeService consumeService;
     @Autowired
     private PulsarOperator pulsarOperator;
 
@@ -77,7 +87,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(clusterTag, null,
                 ClusterType.PULSAR);
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            // 1. create pulsar tenant and namespace
+            // create pulsar tenant and namespace
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
                 tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
@@ -92,14 +102,16 @@ public class PulsarResourceOperator implements QueueResourceOperator {
                 log.info("success to create pulsar namespace for groupId={}, namespace={}", groupId, namespace);
             }
 
-            // 2. create Pulsar Topic - each Inlong Stream corresponds to a Pulsar Topic
+            // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
             List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
             if (streamInfoList == null || streamInfoList.isEmpty()) {
                 log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
                 return;
             }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.createPulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+            // create pulsar topic and subscription
+            for (InlongStreamBriefInfo stream : streamInfoList) {
+                this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
+                this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), stream.getInlongStreamId());
             }
         } catch (Exception e) {
             String msg = String.format("failed to create pulsar resource for groupId=%s", groupId);
@@ -147,10 +159,11 @@ public class PulsarResourceOperator implements QueueResourceOperator {
 
         try {
             // get pulsar cluster via the inlong cluster tag from the inlong group
-            String clusterTag = groupInfo.getInlongClusterTag();
-            ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
-            // create pulsar topic
-            this.createPulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
+                    groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
+            // create pulsar topic and subscription
+            this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
+            this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource(), streamId);
         } catch (Exception e) {
             String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
             log.error(msg, e);
@@ -185,66 +198,86 @@ public class PulsarResourceOperator implements QueueResourceOperator {
     /**
      * Create Pulsar Topic and Subscription, and save the consumer group info.
      */
-    private void createPulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName)
+    private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
             throws Exception {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            // 1. create pulsar topic
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
                 tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
             }
             String namespace = pulsarInfo.getMqResource();
-            PulsarTopicBean topicBean = PulsarTopicBean.builder()
+            PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
                     .tenant(tenant)
                     .namespace(namespace)
                     .topicName(topicName)
                     .queueModule(pulsarInfo.getQueueModule())
                     .numPartitions(pulsarInfo.getPartitionNum())
                     .build();
-            pulsarOperator.createTopic(pulsarAdmin, topicBean);
+            pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+        }
+    }
 
-            // 2. create a subscription for the pulsar topic
-            boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topicName,
-                    InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
+    /**
+     * Create Pulsar Subscription, and save the consumer group info.
+     */
+    private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName,
+            String streamId) throws Exception {
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+            String tenant = pulsarCluster.getTenant();
+            if (StringUtils.isEmpty(tenant)) {
+                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+            }
+            String namespace = pulsarInfo.getMqResource();
+            String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+            boolean exist = pulsarOperator.topicExists(pulsarAdmin, tenant, namespace, topicName,
+                    InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule()));
             if (!exist) {
-                String topicFullName = tenant + "/" + namespace + "/" + topicName;
                 String serviceUrl = pulsarCluster.getAdminUrl();
-                log.error("topic={} not exists in {}", topicFullName, serviceUrl);
-                throw new WorkflowListenerException("topic=" + topicFullName + " not exists in " + serviceUrl);
+                log.error("topic={} not exists in {}", fullTopicName, serviceUrl);
+                throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl);
             }
 
-            // subscription naming rules: clusterTag_topicName_consumer_group
-            String subscription = groupInfo.getInlongClusterTag() + "_" + topicName + "_consumer_group";
-            pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
-            String groupId = groupInfo.getInlongGroupId();
-            log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}",
-                    groupId, topicName, subscription);
+            // create subscription for all sinks
+            String groupId = pulsarInfo.getInlongGroupId();
+            List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId);
+            if (CollectionUtils.isEmpty(streamSinks)) {
+                log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId);
+                return;
+            }
 
-            // 3. insert the consumer group info into the consumption table
-            consumptionService.saveSortConsumption(groupInfo, topicName, subscription);
-            log.info("success to save consume for groupId={}, topic={}, subs={}", groupId, topicName, subscription);
+            // subscription naming rules: clusterTag_topicName_sinkId_consumer_group
+            String clusterTag = pulsarInfo.getInlongClusterTag();
+            for (StreamSink sink : streamSinks) {
+                String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId());
+                pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs);
+                log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
+
+                // insert the consumer group info into the consumption table
+                Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
+                log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
+                        id, subs, groupId, topicName);
+            }
         }
     }
 
     /**
      * Delete Pulsar Topic and Subscription, and delete the consumer group info.
+     * TODO delete Subscription and InlongConsume info
      */
     private void deletePulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName)
             throws Exception {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            // 1. delete pulsar topic
             String tenant = pulsarCluster.getTenant();
             if (StringUtils.isEmpty(tenant)) {
                 tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
             }
             String namespace = groupInfo.getMqResource();
-            PulsarTopicBean topicBean = PulsarTopicBean.builder()
+            PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
                     .tenant(tenant)
                     .namespace(namespace)
                     .topicName(topicName)
                     .build();
-            pulsarOperator.forceDeleteTopic(pulsarAdmin, topicBean);
+            pulsarOperator.forceDeleteTopic(pulsarAdmin, topicInfo);
         }
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
index 1d57b447f..2f05800c5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
@@ -19,16 +19,16 @@ package org.apache.inlong.manager.service.resource.queue.tubemq;
 
 import com.google.common.base.Objects;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
 import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -43,7 +43,7 @@ public class TubeMQResourceOperator implements QueueResourceOperator {
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
-    private ConsumptionService consumptionService;
+    private InlongConsumeService consumeService;
     @Autowired
     private TubeMQOperator tubeMQOperator;
 
@@ -79,9 +79,10 @@ public class TubeMQResourceOperator implements QueueResourceOperator {
             tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
             log.info("success to create tubemq consumer group for groupId={}", groupId);
 
-            // insert the consumer group info into the consumption table
-            consumptionService.saveSortConsumption(groupInfo, topicName, consumeGroup);
-            log.info("success to save consume for groupId={}, topic={}, consumer={}", groupId, topicName, consumeGroup);
+            // insert the consumer group info
+            Integer id = consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
+            log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
+                    id, consumeGroup, groupId, topicName);
 
             log.info("success to create tubemq resource for groupId={}, cluster={}", groupId, tubeCluster);
         } catch (Exception e) {