You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/12 15:22:27 UTC

[incubator-inlong] branch master updated: [INLONG-3089][Manager] Create group resource failed after approving one inlong group (#3092)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 264912c  [INLONG-3089][Manager] Create group resource failed after approving one inlong group (#3092)
264912c is described below

commit 264912ca012b4903490c7b5e30ab6840a71e5d1e
Author: healchow <he...@gmail.com>
AuthorDate: Sat Mar 12 23:21:36 2022 +0800

    [INLONG-3089][Manager] Create group resource failed after approving one inlong group (#3092)
---
 .../common/pojo/dataproxy/PulsarClusterInfo.java   |  4 +-
 .../inlong/manager/client/api/PulsarBaseConf.java  |  2 +
 .../manager/client/api/TdmqPulsarBaseConf.java     |  9 +++
 .../pojo/consumption/ConsumptionMqExtBase.java     |  7 ++-
 .../dao/mapper/ThirdPartyClusterEntityMapper.java  |  2 +-
 .../mappers/ThirdPartyClusterEntityMapper.xml      |  2 +-
 .../manager/service/CommonOperateService.java      | 67 +++++++++-------------
 .../service/core/impl/ConsumptionServiceImpl.java  | 44 +++++++-------
 .../service/core/impl/InlongGroupServiceImpl.java  | 34 +++++------
 .../core/impl/ThirdPartyClusterServiceImpl.java    | 15 ++---
 .../mq/CreatePulsarGroupForStreamTaskListener.java |  2 +-
 .../mq/CreatePulsarGroupTaskListener.java          | 10 ++--
 .../mq/CreatePulsarResourceTaskListener.java       |  2 +-
 .../mq/CreatePulsarTopicForStreamTaskListener.java |  2 +-
 .../service/thirdparty/mq/PulsarEventSelector.java |  8 +--
 .../thirdparty/sort/util/SourceInfoUtils.java      |  9 +--
 .../ConsumptionCompleteProcessListener.java        | 10 ++--
 .../stream/CreateStreamWorkflowDefinition.java     | 12 ++--
 18 files changed, 124 insertions(+), 117 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
index a0733e2..f03204e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
@@ -25,13 +25,15 @@ import lombok.NoArgsConstructor;
 import java.util.Map;
 
 @Data
+@Builder
 @AllArgsConstructor
 @NoArgsConstructor
-@Builder
 public class PulsarClusterInfo {
+
     private String type;
     private String adminUrl;
     private String token;
     private String brokerServiceUrl;
     private Map<String, String> ext;
+
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
index 415d639..dc05abc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
@@ -21,11 +21,13 @@ import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.auth.Authentication;
 import org.apache.inlong.manager.common.enums.MqType;
 
 @Data
+@EqualsAndHashCode(callSuper = true)
 @AllArgsConstructor
 @NoArgsConstructor
 @ApiModel("Base configuration for Pulsar")
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
index 3074794..ae8d231 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
@@ -18,10 +18,19 @@
 package org.apache.inlong.manager.client.api;
 
 import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.MqType;
 
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
 public class TdmqPulsarBaseConf extends PulsarBaseConf {
 
     @ApiModelProperty("Message queue type")
     private MqType type = MqType.TDMQ_PULSAR;
+
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
index 6bff8a7..0f0784f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
@@ -32,14 +32,15 @@ import org.apache.inlong.manager.common.enums.Constant;
 @ApiModel("Extended consumption information of different MQs")
 @JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class)
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR)
+        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR),
+        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR)
 })
 public class ConsumptionMqExtBase {
 
-    @ApiModelProperty(value = "Self-incrementing primary key")
+    @ApiModelProperty(value = "Primary key")
     private Integer id;
 
-    @ApiModelProperty(value = "Consumer information ID")
+    @ApiModelProperty(value = "Consumption ID")
     private Integer consumptionId;
 
     @ApiModelProperty(value = "Consumer group")
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
index fe13d80..962782a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
@@ -39,7 +39,7 @@ public interface ThirdPartyClusterEntityMapper {
 
     List<ThirdPartyClusterEntity> selectByType(@Param("type") String type);
 
-    List<ThirdPartyClusterEntity> selectMqCluster(@Param("mqSetName") String mqSetName,
+    List<ThirdPartyClusterEntity> selectMQCluster(@Param("mqSetName") String mqSetName,
             @Param("typeList") List<String> typeList);
 
     ThirdPartyClusterEntity selectByName(@Param("name") String name);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
index f67ae62..a74ecb8 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
@@ -225,7 +225,7 @@
         where is_deleted = 0
         and name = #{name, jdbcType=VARCHAR}
     </select>
-    <select id="selectMqCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+    <select id="selectMQCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
         select
         <include refid="Base_Column_List"/>
         from third_party_cluster
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index b171f90..baec6e7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service;
 import com.google.gson.Gson;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
@@ -28,19 +29,15 @@ import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
 import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
 import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
-import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper;
 import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -61,6 +58,7 @@ import org.springframework.stereotype.Service;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -82,8 +80,6 @@ public class CommonOperateService {
     @Autowired
     private InlongGroupEntityMapper groupMapper;
     @Autowired
-    private DataProxyClusterEntityMapper dataProxyClusterMapper;
-    @Autowired
     private ThirdPartyClusterEntityMapper thirdPartyClusterMapper;
 
     /**
@@ -100,14 +96,14 @@ public class CommonOperateService {
 
         switch (key) {
             case Constant.PULSAR_SERVICEURL: {
-                clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
+                clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
                 if (clusterEntity != null) {
                     result = clusterEntity.getUrl();
                 }
                 break;
             }
             case Constant.PULSAR_ADMINURL: {
-                clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
+                clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
                 if (clusterEntity != null) {
                     params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
                     result = params.get(key);
@@ -117,7 +113,7 @@ public class CommonOperateService {
             case Constant.CLUSTER_TUBE_MANAGER:
             case Constant.CLUSTER_TUBE_CLUSTER_ID:
             case Constant.TUBE_MASTER_URL: {
-                clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_TUBE);
+                clusterEntity = getMQCluster(Constant.MIDDLEWARE_TUBE);
                 if (clusterEntity != null) {
                     if (key.equals(Constant.TUBE_MASTER_URL)) {
                         result = clusterEntity.getUrl();
@@ -129,61 +125,50 @@ public class CommonOperateService {
                 break;
             }
         }
-
         return result;
     }
 
     /**
      * Get third party cluster by type.
      *
-     * TODO Add more condition for query.
+     * TODO Add data_proxy_cluster_name for query.
      *
      * @param type Cluster type, such as TUBE, PULSAR, etc.
      */
-    private ThirdPartyClusterEntity getThirdPartyCluster(String type) {
-        InlongGroupPageRequest groupPageRequest = new InlongGroupPageRequest();
-        groupPageRequest.setMiddlewareType(type);
-        List<InlongGroupEntity> groupEntities = groupMapper.selectByCondition(groupPageRequest);
-        if (groupEntities.isEmpty()) {
-            LOGGER.warn("no inlong group found by type={}", type);
+    private ThirdPartyClusterEntity getMQCluster(String type) {
+        List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
+        if (CollectionUtils.isEmpty(clusterList)) {
+            LOGGER.warn("no data proxy cluster found");
             return null;
         }
-
-        Integer clusterId = groupEntities.get(0).getProxyClusterId();
-        DataProxyClusterEntity dataProxyCluster = dataProxyClusterMapper.selectByPrimaryKey(clusterId);
-        if (dataProxyCluster == null) {
-            LOGGER.warn("no data proxy cluster found with id={}", clusterId);
+        String mqSetName = clusterList.get(0).getMqSetName();
+        List<ThirdPartyClusterEntity> mqClusterList = thirdPartyClusterMapper.selectMQCluster(mqSetName,
+                Collections.singletonList(type));
+        if (CollectionUtils.isEmpty(mqClusterList)) {
+            LOGGER.warn("no mq cluster found by type={} and mq set name={}", type, mqSetName);
             return null;
         }
 
-        String mqSetName = dataProxyCluster.getMqSetName();
-        ClusterPageRequest clusterRequest = new ClusterPageRequest();
-        clusterRequest.setMqSetName(mqSetName);
-        List<ThirdPartyClusterEntity> thirdPartyClusters = thirdPartyClusterMapper.selectByCondition(clusterRequest);
-        if (CollectionUtils.isEmpty(thirdPartyClusters)) {
-            LOGGER.warn("no related third-party-cluster by type={} and mq set name={}", type, mqSetName);
-            return null;
-        }
-
-        return thirdPartyClusters.get(0);
+        return mqClusterList.get(0);
     }
 
     /**
-     * Get Pulsar cluster info.
+     * Get Pulsar cluster by the given type.
      *
      * @return Pulsar cluster info.
      */
-    public PulsarClusterInfo getPulsarClusterInfo() {
-        ThirdPartyClusterEntity thirdPartyClusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
-        Preconditions.checkNotNull(thirdPartyClusterEntity.getExtParams(), "pulsar extParam is empty, check"
-                + "third party cluster table");
-        Map<String, String> configParams = JsonUtils.parse(thirdPartyClusterEntity.getExtParams(), Map.class);
+    public PulsarClusterInfo getPulsarClusterInfo(String type) {
+        ThirdPartyClusterEntity clusterEntity = getMQCluster(type);
+        if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) {
+            throw new BusinessException("pulsar cluster or pulsar ext params is empty");
+        }
+        Map<String, String> configParams = JsonUtils.parse(clusterEntity.getExtParams(), Map.class);
         PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl(
-                thirdPartyClusterEntity.getUrl()).token(thirdPartyClusterEntity.getToken()).build();
+                clusterEntity.getUrl()).token(clusterEntity.getToken()).build();
         String adminUrl = configParams.get(Constant.PULSAR_ADMINURL);
         Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table");
         pulsarClusterInfo.setAdminUrl(adminUrl);
-        pulsarClusterInfo.setType(thirdPartyClusterEntity.getType());
+        pulsarClusterInfo.setType(clusterEntity.getType());
         return pulsarClusterInfo;
     }
 
@@ -241,7 +226,7 @@ public class CommonOperateService {
 
         // Get source info
         String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
-        PulsarClusterInfo pulsarCluster = getPulsarClusterInfo();
+        PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
         InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
         SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
                 groupInfo, streamInfo, sourceResponse, sourceFields);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 72cdf7d..fc8b1d8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -20,15 +20,6 @@ package org.apache.inlong.manager.service.core.impl;
 import com.github.pagehelper.Page;
 import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
@@ -66,6 +57,16 @@ import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
 import org.springframework.util.CollectionUtils;
 
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
 /**
  * Data consumption service
  */
@@ -119,7 +120,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
         ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
 
-        if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+        String mqType = info.getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
             Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
             ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
@@ -147,11 +149,10 @@ public class ConsumptionServiceImpl implements ConsumptionService {
     @Transactional(rollbackFor = Throwable.class)
     public Integer save(ConsumptionInfo info, String operator) {
         fullConsumptionInfo(info);
-
         Date now = new Date();
         ConsumptionEntity entity = this.saveConsumption(info, operator, now);
-
-        if (Constant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
+        String mqType = entity.getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             savePulsarInfo(info.getMqExtInfo(), entity);
         }
 
@@ -232,7 +233,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         entity.setModifyTime(now);
 
         // Modify Pulsar consumption info
-        if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+        String mqType = info.getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
             Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
             pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
@@ -338,10 +340,10 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         }
 
         log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
-        String middlewareType = bizInfo.getMiddlewareType();
+        String mqType = bizInfo.getMiddlewareType();
         ConsumptionEntity entity = new ConsumptionEntity();
         entity.setInlongGroupId(groupId);
-        entity.setMiddlewareType(middlewareType);
+        entity.setMiddlewareType(mqType);
         entity.setTopic(topic);
         entity.setConsumerGroupId(consumerGroup);
         entity.setConsumerGroupName(consumerGroup);
@@ -355,7 +357,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
         consumptionMapper.insert(entity);
 
-        if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
             pulsarEntity.setConsumptionId(entity.getId());
             pulsarEntity.setConsumerGroupId(consumerGroup);
@@ -370,7 +372,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
     private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
         NewConsumptionProcessForm form = new NewConsumptionProcessForm();
         Integer id = consumptionInfo.getId();
-        if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(consumptionInfo.getMiddlewareType())) {
+        String mqType = consumptionInfo.getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
             ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
                     ConsumptionPulsarInfo::new);
@@ -426,11 +429,12 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId);
 
         // Tube’s topic is the inlong group level, one inlong group, one Tube topic
-        if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+        String mqType = topicVO.getMiddlewareType();
+        if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
             String bizTopic = topicVO.getMqResourceObj();
             Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
                     "topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
-        } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+        } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             // Pulsar's topic is the inlong stream level.
             // There will be multiple inlong streams under one inlong group, and there will be multiple topics
             List<InlongStreamTopicResponse> dsTopicList = topicVO.getDsTopicList();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index d55a3c4..cff6760 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -117,7 +117,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         groupMapper.insertSelective(entity);
         this.saveOrUpdateExt(groupId, groupInfo.getExtList());
 
-        if (Constant.MIDDLEWARE_PULSAR.equals(groupInfo.getMiddlewareType())) {
+        String mqType = groupInfo.getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
             Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
 
@@ -172,23 +173,23 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         groupInfo.setExtList(extInfoList);
 
         // If the middleware is Pulsar, we need to encapsulate Pulsar related data
-        String middlewareType = entity.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+        String mqType = entity.getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             InlongGroupPulsarEntity pulsarEntity = groupPulsarMapper.selectByGroupId(groupId);
-            Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found under the inlong group");
+            Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found by the groupId=" + groupId);
             InlongGroupPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, InlongGroupPulsarInfo::new);
-            pulsarInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+            pulsarInfo.setMiddlewareType(mqType);
             groupInfo.setMqExtInfo(pulsarInfo);
         }
 
         // For approved inlong group, encapsulate the cluster address of the middleware
         if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode(groupInfo.getStatus())) {
-            if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+            if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(mqType)) {
                 groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
-            } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
-                PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
-                groupInfo.setPulsarAdminUrl(pulsarClusterInfo.getAdminUrl());
-                groupInfo.setPulsarServiceUrl(pulsarClusterInfo.getBrokerServiceUrl());
+            } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+                PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType);
+                groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl());
+                groupInfo.setPulsarServiceUrl(pulsarCluster.getBrokerServiceUrl());
             }
         }
 
@@ -240,7 +241,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         this.saveOrUpdateExt(groupId, groupRequest.getExtList());
 
         // Update the Pulsar info
-        if (Constant.MIDDLEWARE_PULSAR.equals(groupRequest.getMiddlewareType())) {
+        String mqType = groupRequest.getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupRequest.getMqExtInfo();
             Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
             Integer writeQuorum = pulsarInfo.getWriteQuorum();
@@ -401,25 +403,25 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         LOGGER.debug("begin to get topic by groupId={}", groupId);
         InlongGroupInfo groupInfo = this.get(groupId);
 
-        String middlewareType = groupInfo.getMiddlewareType();
+        String mqType = groupInfo.getMiddlewareType();
         InlongGroupTopicResponse topicVO = new InlongGroupTopicResponse();
 
-        if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+        if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
             // Tube Topic corresponds to inlong group one-to-one
             topicVO.setMqResourceObj(groupInfo.getMqResourceObj());
             topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
-        } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+        } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             // Pulsar's topic corresponds to the inlong stream one-to-one
             topicVO.setDsTopicList(streamService.getTopicList(groupId));
             topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
             topicVO.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL));
         } else {
-            LOGGER.error("middleware type={} not supported", middlewareType);
+            LOGGER.error("middleware type={} not supported", mqType);
             throw new BusinessException(ErrorCodeEnum.MIDDLEWARE_TYPE_NOT_SUPPORTED);
         }
 
         topicVO.setInlongGroupId(groupId);
-        topicVO.setMiddlewareType(middlewareType);
+        topicVO.setMiddlewareType(mqType);
         return topicVO;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index b7e5cb7..2247b68 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -211,10 +211,11 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
 
             DataProxyConfig config = new DataProxyConfig();
             config.setM(groupEntity.getSchemaName());
-            if (Constant.MIDDLEWARE_TUBE.equals(groupEntity.getMiddlewareType())) {
+            String mqType = groupEntity.getMiddlewareType();
+            if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
                 config.setInlongGroupId(groupId);
                 config.setTopic(bizResource);
-            } else if (Constant.MIDDLEWARE_PULSAR.equals(groupEntity.getMiddlewareType())) {
+            } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
                 List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
                 for (InlongStreamEntity stream : streamList) {
                     String topic = stream.getMqResourceObj();
@@ -248,9 +249,9 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
         }
 
         // third-party-cluster type
-        String middlewareType = "";
+        String mqType = "";
         if (!groupEntityList.isEmpty()) {
-            middlewareType = groupEntityList.get(0).getMiddlewareType();
+            mqType = groupEntityList.get(0).getMiddlewareType();
         }
 
         // Get topic list by group id
@@ -258,7 +259,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
         for (InlongGroupEntity groupEntity : groupEntityList) {
             final String groupId = groupEntity.getInlongGroupId();
             final String mqResource = groupEntity.getMqResourceObj();
-            if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
+            if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
                 List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
                 for (InlongStreamEntity stream : streamList) {
                     DataProxyConfig topicConfig = new DataProxyConfig();
@@ -273,7 +274,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
                     topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
                     topicList.add(topicConfig);
                 }
-            } else if (Constant.MIDDLEWARE_TUBE.equals(middlewareType)) {
+            } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
                 DataProxyConfig topicConfig = new DataProxyConfig();
                 topicConfig.setInlongGroupId(groupId);
                 topicConfig.setTopic(mqResource);
@@ -285,7 +286,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
         List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
         List<String> clusterType = Arrays.asList(Constant.CLUSTER_TUBE, Constant.CLUSTER_PULSAR,
                 Constant.CLUSTER_TDMQ_PULSAR);
-        List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectMqCluster(
+        List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectMQCluster(
                 clusterEntity.getMqSetName(), clusterType);
         for (ThirdPartyClusterEntity cluster : clusterList) {
             ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
index a814379..d4ec8fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -86,7 +86,7 @@ public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListe
             log.warn("inlong stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId);
             return ListenerResult.success();
         }
-        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
         try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             // Query data sink info based on groupId and streamId
             List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
index 304dc78..c22342d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
@@ -70,8 +70,8 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
 
         String groupId = form.getInlongGroupId();
-        InlongGroupInfo bizInfo = groupService.get(groupId);
-        if (bizInfo == null) {
+        InlongGroupInfo groupInfo = groupService.get(groupId);
+        if (groupInfo == null) {
             log.error("inlong group not found with groupId={}", groupId);
             throw new WorkflowListenerException("inlong group not found with groupId=" + groupId);
         }
@@ -82,10 +82,10 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
             log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
             return ListenerResult.success();
         }
-        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
         try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             String tenant = clusterBean.getDefaultTenant();
-            String namespace = bizInfo.getMqResourceObj();
+            String namespace = groupInfo.getMqResourceObj();
 
             for (InlongStreamEntity streamEntity : streamEntities) {
                 PulsarTopicBean topicBean = new PulsarTopicBean();
@@ -114,7 +114,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
                         pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
 
                         // Insert the consumption data into the consumption table
-                        consumptionService.saveSortConsumption(bizInfo, topic, subscription);
+                        consumptionService.saveSortConsumption(groupInfo, topic, subscription);
                     }
                 }
             }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
index 9cb3d88..b0b8827 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
@@ -77,7 +77,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
         if (groupInfo == null) {
             throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
         }
-        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
         try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
             for (String cluster : pulsarClusters) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
index 3864ac2..2811b5f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -76,7 +76,7 @@ public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListe
         }
 
         log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
-        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
         try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
             for (String cluster : pulsarClusters) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
index 5da60b8..0bf990c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
@@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.EventSelector;
 
 @Slf4j
@@ -35,13 +35,13 @@ public class PulsarEventSelector implements EventSelector {
             return false;
         }
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        String middlewareType = form.getGroupInfo().getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+        String mqType = form.getGroupInfo().getMiddlewareType();
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) form.getGroupInfo().getMqExtInfo();
             return pulsarInfo.getEnableCreateResource() == 1;
         }
         log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
-                form.getInlongGroupId(), middlewareType);
+                form.getInlongGroupId(), mqType);
         return false;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 2df1936..13a74c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -64,17 +64,17 @@ public class SourceInfoUtils {
             ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
             SourceResponse sourceResponse, List<FieldInfo> sourceFields) {
 
-        String middleWareType = groupInfo.getMiddlewareType();
+        String mqType = groupInfo.getMiddlewareType();
         DeserializationInfo deserializationInfo = SerializationUtils.createDeserialInfo(sourceResponse, streamInfo);
         SourceInfo sourceInfo;
-        if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
+        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             sourceInfo = createPulsarSourceInfo(pulsarCluster, clusterBean, groupInfo, streamInfo, deserializationInfo,
                     sourceFields);
-        } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
+        } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
             // InlongGroupInfo groupInfo, String masterAddress,
             sourceInfo = createTubeSourceInfo(groupInfo, masterAddress, clusterBean, deserializationInfo, sourceFields);
         } else {
-            throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", middleWareType));
+            throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", mqType));
         }
 
         return sourceInfo;
@@ -98,6 +98,7 @@ public class SourceInfoUtils {
         final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + topicName;
         final String consumerGroup = clusterBean.getAppName() + "_" + topicName + "_consumer_group";
         FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
+
         String type = pulsarCluster.getType();
         if (StringUtils.isNotEmpty(type) && Constant.MIDDLEWARE_TDMQ_PULSAR.equals(type)) {
             return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 1e0d757..7f0b49d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -84,14 +84,14 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
             throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
         }
 
-        String middlewareType = entity.getMiddlewareType();
-        if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+        String mqType = entity.getMiddlewareType();
+        if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
             this.createTubeConsumerGroup(entity);
             return ListenerResult.success("Create Tube consumer group successful");
-        } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+        } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
             this.createPulsarTopicMessage(entity);
         } else {
-            throw new WorkflowListenerException("middleware type [" + middlewareType + "] not supported");
+            throw new WorkflowListenerException("middleware type [" + mqType + "] not supported");
         }
 
         this.updateConsumerInfo(consumptionId, entity.getConsumerGroupId());
@@ -119,7 +119,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
         Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
         String mqResourceObj = groupInfo.getMqResourceObj();
         Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + groupId);
-        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(entity.getMiddlewareType());
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             PulsarTopicBean topicMessage = new PulsarTopicBean();
             String tenant = clusterBean.getDefaultTenant();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index c10c80d..8e879ec 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -89,12 +89,12 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask createPulsarTopicTask = new ServiceTask();
         createPulsarTopicTask.setSkipResolver(c -> {
             GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
-            String middlewareType = form.getGroupInfo().getMiddlewareType();
-            if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+            String mqType = form.getGroupInfo().getMiddlewareType();
+            if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
                 return false;
             }
             log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}",
-                    form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+                    form.getInlongGroupId(), form.getInlongStreamId(), mqType);
             return true;
         });
         createPulsarTopicTask.setName("createPulsarTopic");
@@ -105,12 +105,12 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask();
         createPulsarSubscriptionGroupTask.setSkipResolver(c -> {
             GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
-            String middlewareType = form.getGroupInfo().getMiddlewareType();
-            if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+            String mqType = form.getGroupInfo().getMiddlewareType();
+            if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
                 return false;
             }
             log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}",
-                    form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+                    form.getInlongGroupId(), form.getInlongStreamId(), mqType);
             return true;
         });
         createPulsarSubscriptionGroupTask.setName("createPulsarSubscription");