You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/07 03:20:37 UTC

[incubator-inlong] branch master updated: [INLONG-4543][Manager] Query MQ clusters from inlong cluster table (#4546)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 95f9dbcc0 [INLONG-4543][Manager] Query MQ clusters from inlong cluster table (#4546)
95f9dbcc0 is described below

commit 95f9dbcc000eac6f37670fe011b851f73da59545
Author: healzhou <he...@gmail.com>
AuthorDate: Tue Jun 7 11:20:31 2022 +0800

    [INLONG-4543][Manager] Query MQ clusters from inlong cluster table (#4546)
---
 .../common/pojo/dataproxy/PulsarClusterInfo.java   |  39 -----
 .../manager/service/CommonOperateService.java      | 186 ---------------------
 .../service/cluster/InlongClusterService.java      |  11 ++
 .../service/cluster/InlongClusterServiceImpl.java  |  35 ++--
 .../service/core/impl/ConsumptionServiceImpl.java  |   1 +
 .../manager/service/group/GroupCheckService.java   |  69 ++++++++
 .../service/mq/CreatePulsarGroupTaskListener.java  |  50 +++---
 .../mq/CreatePulsarResourceTaskListener.java       |  55 +++---
 .../mq/CreatePulsarSubscriptionTaskListener.java   |  62 ++++---
 .../service/mq/CreatePulsarTopicTaskListener.java  |  61 ++++---
 .../service/mq/CreateTubeGroupTaskListener.java    |  38 +++--
 .../service/mq/CreateTubeTopicTaskListener.java    |   5 +-
 .../service/mq/util/PulsarOptServiceImpl.java      |  14 +-
 .../manager/service/mq/util/PulsarUtils.java       |  36 ++--
 .../manager/service/mq/util/TubeMqOptService.java  |  33 ++--
 .../service/sink/StreamSinkServiceImpl.java        |  14 +-
 .../service/sort/CreateSortConfigListenerV2.java   |  42 +++--
 .../sort/CreateStreamSortConfigListener.java       |  32 ++--
 .../service/sort/PushSortConfigListener.java       |   9 +-
 .../manager/service/sort/util/DataFlowUtils.java   |   7 +-
 .../service/source/StreamSourceServiceImpl.java    |  18 +-
 .../transform/StreamTransformServiceImpl.java      |  10 +-
 .../ConsumptionCompleteProcessListener.java        |  33 ++--
 23 files changed, 377 insertions(+), 483 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
deleted file mode 100644
index f03204ed0..000000000
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.pojo.dataproxy;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-
-@Data
-@Builder
-@AllArgsConstructor
-@NoArgsConstructor
-public class PulsarClusterInfo {
-
-    private String type;
-    private String adminUrl;
-    private String token;
-    private String brokerServiceUrl;
-    private Map<String, String> ext;
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
deleted file mode 100644
index 6f95dbd79..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Common operation service
- */
-@Service
-public class CommonOperateService {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(CommonOperateService.class);
-
-    @Autowired
-    public ObjectMapper objectMapper;
-    @Autowired
-    private InlongGroupEntityMapper groupMapper;
-    @Autowired
-    private InlongClusterEntityMapper clusterMapper;
-
-    /**
-     * query some third-party-cluster info according key, such as "pulsar_adminUrl", "cluster_tube_manager", etc.
-     *
-     * @param key Param name.
-     * @return Value of key in database.
-     */
-    public String getSpecifiedParam(String key) {
-        String result = "";
-        InlongClusterEntity clusterEntity;
-        Gson gson = new Gson();
-        Map<String, String> params;
-
-        switch (key) {
-            case InlongGroupSettings.PULSAR_SERVICE_URL: {
-                clusterEntity = getMQCluster(MQType.PULSAR);
-                if (clusterEntity != null) {
-                    result = clusterEntity.getUrl();
-                }
-                break;
-            }
-            case InlongGroupSettings.PULSAR_ADMIN_URL: {
-                clusterEntity = getMQCluster(MQType.PULSAR);
-                if (clusterEntity != null) {
-                    params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
-                    result = params.get(key);
-                }
-                break;
-            }
-            case InlongGroupSettings.TUBE_MANAGER_URL:
-            case InlongGroupSettings.TUBE_CLUSTER_ID:
-            case InlongGroupSettings.TUBE_MASTER_URL: {
-                clusterEntity = getMQCluster(MQType.TUBE);
-                if (clusterEntity != null) {
-                    if (key.equals(InlongGroupSettings.TUBE_MASTER_URL)) {
-                        result = clusterEntity.getUrl();
-                    } else {
-                        params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
-                        result = params.get(key);
-                    }
-                }
-                break;
-            }
-            default:
-                LOGGER.warn("case warn key {}", key);
-        }
-        return result;
-    }
-
-    /**
-     * Get third party cluster by type.
-     *
-     * TODO Add data_proxy_cluster_name for query.
-     */
-    private InlongClusterEntity getMQCluster(MQType type) {
-        List<InlongClusterEntity> clusterList = clusterMapper.selectByKey(null, null,
-                InlongGroupSettings.CLUSTER_DATA_PROXY);
-        if (CollectionUtils.isEmpty(clusterList)) {
-            LOGGER.warn("no data proxy cluster found");
-            return null;
-        }
-
-        String clusterTag = clusterList.get(0).getClusterTag();
-        InlongClusterPageRequest request = new InlongClusterPageRequest();
-        request.setClusterTag(clusterTag);
-        request.setType(type.getType());
-        List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(request);
-        if (CollectionUtils.isEmpty(mqClusterList)) {
-            LOGGER.warn("no mq cluster found by cluster tag={} and type={}", clusterTag, type);
-            return null;
-        }
-
-        return mqClusterList.get(0);
-    }
-
-    /**
-     * Get Pulsar cluster by the given type.
-     *
-     * @return Pulsar cluster info.
-     */
-    public PulsarClusterInfo getPulsarClusterInfo(String type) {
-        MQType mqType = MQType.forType(type);
-        InlongClusterEntity clusterEntity = getMQCluster(mqType);
-        if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) {
-            throw new BusinessException("pulsar cluster or pulsar ext params is empty");
-        }
-
-        PulsarClusterInfo pulsarCluster = PulsarClusterInfo.builder()
-                .brokerServiceUrl(clusterEntity.getUrl())
-                .token(clusterEntity.getToken())
-                .build();
-        try {
-            Map<String, String> configParams = objectMapper.readValue(clusterEntity.getExtParams(), Map.class);
-            String adminUrl = configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
-            pulsarCluster.setAdminUrl(adminUrl);
-        } catch (Exception e) {
-            LOGGER.error("parse pulsar cluster info error: ", e);
-        }
-
-        Preconditions.checkNotNull(pulsarCluster.getAdminUrl(), "adminUrl is empty, check third party cluster table");
-        pulsarCluster.setType(clusterEntity.getType());
-        return pulsarCluster;
-    }
-
-    /**
-     * Check whether the inlong group status is temporary
-     *
-     * @param groupId Inlong group id
-     * @return Inlong group entity, for caller reuse
-     */
-    public InlongGroupEntity checkGroupStatus(String groupId, String operator) {
-        InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
-        Preconditions.checkNotNull(inlongGroupEntity, "groupId is invalid");
-
-        List<String> managers = Arrays.asList(inlongGroupEntity.getInCharges().split(","));
-        Preconditions.checkTrue(managers.contains(operator),
-                String.format(ErrorCodeEnum.USER_IS_NOT_MANAGER.getMessage(), operator, managers));
-
-        GroupStatus state = GroupStatus.forCode(inlongGroupEntity.getStatus());
-        // Add/modify/delete is not allowed under certain group state
-        if (GroupStatus.notAllowedUpdate(state)) {
-            LOGGER.error("inlong group status was not allowed to add/update/delete related info");
-            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS);
-        }
-
-        return inlongGroupEntity;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 11645441d..49d65c39d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -50,6 +50,17 @@ public interface InlongClusterService {
      */
     InlongClusterInfo get(Integer id);
 
+    /**
+     * Get one cluster by the cluster tag, cluster name and cluster type.
+     *
+     * @param clusterTag cluster tag
+     * @param clusterName cluster name
+     * @param clusterType cluster type
+     * @return cluster info
+     * @apiNote No matter how many clusters there are, only one cluster is returned.
+     */
+    InlongClusterInfo getOne(String clusterTag, String clusterName, String clusterType);
+
     /**
      * Paging query clusters according to conditions.
      *
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 712f43f8e..2adcd2b2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -77,6 +77,10 @@ public class InlongClusterServiceImpl implements InlongClusterService {
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
     private static final Gson GSON = new Gson();
 
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private InlongStreamEntityMapper streamMapper;
     @Autowired
     private InlongClusterOperatorFactory clusterOperatorFactory;
     @Autowired
@@ -84,10 +88,6 @@ public class InlongClusterServiceImpl implements InlongClusterService {
     @Autowired
     private InlongClusterNodeEntityMapper clusterNodeMapper;
     @Autowired
-    private InlongGroupEntityMapper groupMapper;
-    @Autowired
-    private InlongStreamEntityMapper streamMapper;
-    @Autowired
     private DataProxyConfigRepository proxyRepository;
 
     @Override
@@ -133,19 +133,32 @@ public class InlongClusterServiceImpl implements InlongClusterService {
         PageHelper.startPage(request.getPageNum(), request.getPageSize());
         Page<InlongClusterEntity> entityPage = (Page<InlongClusterEntity>) clusterMapper.selectByCondition(request);
 
-        List<InlongClusterInfo> list = new ArrayList<>(entityPage.getResult().size());
-        for (InlongClusterEntity entity : entityPage) {
-            InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());
-            InlongClusterInfo clusterInfo = instance.getFromEntity(entity);
-            list.add(clusterInfo);
-        }
-
+        List<InlongClusterInfo> list = entityPage.stream()
+                .map(entity -> {
+                    InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());
+                    return instance.getFromEntity(entity);
+                }).collect(Collectors.toList());
         PageInfo<InlongClusterInfo> page = new PageInfo<>(list);
         page.setTotal(list.size());
         LOGGER.debug("success to list inlong cluster by {}", request);
         return page;
     }
 
+    @Override
+    public InlongClusterInfo getOne(String clusterTag, String name, String type) {
+        List<InlongClusterEntity> entityList = clusterMapper.selectByKey(clusterTag, name, type);
+        if (CollectionUtils.isEmpty(entityList)) {
+            throw new BusinessException(String.format("cluster not found by tag=%s, name=%s, type=%s",
+                    clusterTag, name, type));
+        }
+
+        InlongClusterEntity entity = entityList.get(0);
+        InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());
+        InlongClusterInfo result = instance.getFromEntity(entity);
+        LOGGER.debug("success to get inlong cluster by tag={}, name={}, type={}", clusterTag, name, type);
+        return result;
+    }
+
     @Override
     public Boolean update(InlongClusterRequest request, String operator) {
         LOGGER.debug("begin to update inlong cluster={}", request);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index fd8a25137..899777a42 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -301,6 +301,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
     /**
      * According to groupId and topic, stitch the full path of Pulsar Topic
+     * TODO: save full topic of Pulsar in consumption info
      */
     private String getFullPulsarTopic(String groupId, String topic) {
         InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
new file mode 100644
index 000000000..c3062c047
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.group;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Service of inlong group check.
+ */
+@Service
+public class GroupCheckService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(GroupCheckService.class);
+
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+
+    /**
+     * Check whether the inlong group status is temporary
+     *
+     * @param groupId Inlong group id
+     * @return Inlong group entity, for caller reuse
+     */
+    public InlongGroupEntity checkGroupStatus(String groupId, String operator) {
+        InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
+        Preconditions.checkNotNull(inlongGroupEntity, "groupId is invalid");
+
+        List<String> managers = Arrays.asList(inlongGroupEntity.getInCharges().split(","));
+        Preconditions.checkTrue(managers.contains(operator),
+                String.format(ErrorCodeEnum.USER_IS_NOT_MANAGER.getMessage(), operator, managers));
+
+        GroupStatus state = GroupStatus.forCode(inlongGroupEntity.getStatus());
+        // Add/modify/delete is not allowed under certain group state
+        if (GroupStatus.notAllowedUpdate(state)) {
+            LOGGER.error("inlong group status was not allowed to add/update/delete related info");
+            throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS);
+        }
+
+        return inlongGroupEntity;
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index 96763a4ad..39c211ff6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -18,16 +18,19 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
-import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.mq.util.PulsarOptService;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
@@ -49,16 +52,14 @@ import java.util.List;
 public class CreatePulsarGroupTaskListener implements QueueOperateListener {
 
     @Autowired
-    private CommonOperateService commonOperateService;
+    private InlongGroupService groupService;
     @Autowired
-    private ClusterBean clusterBean;
+    private InlongStreamService streamService;
     @Autowired
-    private InlongGroupService groupService;
+    private InlongClusterService clusterService;
     @Autowired
     private ConsumptionService consumptionService;
     @Autowired
-    private InlongStreamEntityMapper streamMapper;
-    @Autowired
     private PulsarOptService pulsarOptService;
 
     @Override
@@ -69,8 +70,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-
-        String groupId = form.getInlongGroupId();
+        final String groupId = form.getInlongGroupId();
         InlongGroupInfo groupInfo = groupService.get(groupId);
         if (groupInfo == null) {
             log.error("inlong group not found with groupId={}", groupId);
@@ -78,21 +78,27 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
         }
 
         // For Pulsar, each Stream corresponds to a Topic
-        List<InlongStreamEntity> streamEntities = streamMapper.selectByGroupId(groupId);
-        if (streamEntities == null || streamEntities.isEmpty()) {
+        List<InlongStreamBriefInfo> streamInfo = streamService.getTopicList(groupId);
+        if (streamInfo == null || streamInfo.isEmpty()) {
             log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
             return ListenerResult.success();
         }
-        PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
-            String tenant = clusterBean.getDefaultTenant();
+
+        String clusterTag = groupInfo.getInlongClusterTag();
+        InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+            String tenant = pulsarCluster.getTenant();
+            if (StringUtils.isEmpty(tenant)) {
+                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+            }
             String namespace = groupInfo.getMqResource();
 
-            for (InlongStreamEntity streamEntity : streamEntities) {
+            for (InlongStreamBriefInfo stream : streamInfo) {
                 PulsarTopicBean topicBean = new PulsarTopicBean();
                 topicBean.setTenant(tenant);
                 topicBean.setNamespace(namespace);
-                String topic = streamEntity.getMqResource();
+                String topic = stream.getMqResource();
                 topicBean.setTopicName(topic);
 
                 // Create a subscription in the Pulsar cluster you need to ensure that the Topic exists
@@ -100,13 +106,13 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
                     boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
                     if (!exist) {
                         String topicFull = tenant + "/" + namespace + "/" + topic;
-                        String serviceUrl = pulsarClusterInfo.getAdminUrl();
+                        String serviceUrl = pulsarCluster.getAdminUrl();
                         log.error("topic={} not exists in {}", topicFull, serviceUrl);
                         throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
                     }
 
-                    // Consumer naming rules: sortAppName_topicName_consumer_group
-                    String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+                    // Consumer naming rules: clusterTag_topicName_consumer_group
+                    String subscription = clusterTag + "_" + topic + "_consumer_group";
                     pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
 
                     // Insert the consumption data into the consumption table
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index e2cedd393..b58e9b921 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -18,17 +18,20 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.mq.util.PulsarOptService;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
@@ -50,15 +53,13 @@ import java.util.List;
 public class CreatePulsarResourceTaskListener implements QueueOperateListener {
 
     @Autowired
-    PulsarOptService pulsarOptService;
-    @Autowired
-    private ClusterBean clusterBean;
+    private InlongGroupService groupService;
     @Autowired
-    private CommonOperateService commonOperateService;
+    private InlongStreamService streamService;
     @Autowired
-    private InlongGroupService groupService;
+    private InlongClusterService clusterService;
     @Autowired
-    private InlongStreamEntityMapper streamMapper;
+    private PulsarOptService pulsarOptService;
 
     @Override
     public TaskEvent event() {
@@ -75,10 +76,12 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
         if (groupInfo == null) {
             throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
         }
-        InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-        PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
+
         try {
-            this.createPulsarProcess(pulsarInfo, pulsarClusterInfo);
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+            InlongClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+                    ClusterType.CLS_PULSAR);
+            this.createPulsarProcess(pulsarInfo, (PulsarClusterInfo) clusterInfo);
         } catch (Exception e) {
             log.error("create pulsar resource error for groupId={}", groupId, e);
             throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId);
@@ -91,26 +94,29 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
     /**
      * Create Pulsar tenant, namespace and topic
      */
-    private void createPulsarProcess(InlongPulsarInfo groupInfo, PulsarClusterInfo pulsarClusterInfo) throws Exception {
-        String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, pulsarClusterInfo);
+    private void createPulsarProcess(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster) throws Exception {
+        String groupId = pulsarInfo.getInlongGroupId();
+        log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, pulsarCluster);
 
-        String namespace = groupInfo.getMqResource();
+        String namespace = pulsarInfo.getMqResource();
         Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty for groupId=" + groupId);
-        String queueModule = groupInfo.getQueueModule();
+        String queueModule = pulsarInfo.getQueueModule();
         Preconditions.checkNotNull(queueModule, "queue module cannot be empty for groupId=" + groupId);
 
-        String tenant = clusterBean.getDefaultTenant();
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
             // create pulsar tenant
+            String tenant = pulsarCluster.getTenant();
+            if (StringUtils.isEmpty(tenant)) {
+                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+            }
             pulsarOptService.createTenant(pulsarAdmin, tenant);
 
             // create pulsar namespace
-            pulsarOptService.createNamespace(pulsarAdmin, groupInfo, tenant, namespace);
+            pulsarOptService.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
 
             // create pulsar topic
-            Integer partitionNum = groupInfo.getPartitionNum();
-            List<InlongStreamBriefInfo> streamTopicList = streamMapper.selectBriefList(groupId);
+            Integer partitionNum = pulsarInfo.getPartitionNum();
+            List<InlongStreamBriefInfo> streamTopicList = streamService.getTopicList(groupId);
             PulsarTopicBean topicBean = PulsarTopicBean.builder()
                     .tenant(tenant).namespace(namespace).numPartitions(partitionNum).queueModule(queueModule).build();
 
@@ -119,8 +125,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
                 pulsarOptService.createTopic(pulsarAdmin, topicBean);
             }
         }
-        log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId,
-                pulsarClusterInfo.getAdminUrl());
+        log.info("finish to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster);
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
index f5c9c9dc7..b806aef74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
@@ -18,19 +18,20 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.service.mq.util.PulsarOptService;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -49,9 +50,7 @@ import java.util.List;
 public class CreatePulsarSubscriptionTaskListener implements QueueOperateListener {
 
     @Autowired
-    private CommonOperateService commonOperateService;
-    @Autowired
-    private ClusterBean clusterBean;
+    private InlongClusterService clusterService;
     @Autowired
     private PulsarOptService pulsarOptService;
     @Autowired
@@ -71,10 +70,11 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
         InlongStreamInfo streamInfo = form.getStreamInfo();
         final String groupId = streamInfo.getInlongGroupId();
         final String streamId = streamInfo.getInlongStreamId();
-        final String namespace = groupInfo.getMqResource();
-        final String topic = streamInfo.getMqResource();
-        PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+
+        String clusterTag = groupInfo.getInlongClusterTag();
+        InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
             // Query data sink info based on groupId and streamId
             List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
             if (sinkTypeList == null || sinkTypeList.size() == 0) {
@@ -82,38 +82,36 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
                         groupId, streamId);
                 return ListenerResult.success();
             }
-            String tenant = clusterBean.getDefaultTenant();
+
+            String tenant = pulsarCluster.getTenant();
+            String namespace = groupInfo.getMqResource();
+            String topic = streamInfo.getMqResource();
             PulsarTopicBean topicBean = new PulsarTopicBean();
             topicBean.setTenant(tenant);
             topicBean.setNamespace(namespace);
             topicBean.setTopicName(topic);
 
             // Create a subscription in the Pulsar cluster, you need to ensure that the Topic exists
-            try {
-                boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
-                if (!exist) {
-                    String fullTopic = tenant + "/" + namespace + "/" + topic;
-                    String serviceUrl = pulsarClusterInfo.getAdminUrl();
-                    log.error("topic={} not exists in {}", fullTopic, serviceUrl);
-                    throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
-                }
+            boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+            if (!exist) {
+                String fullTopic = tenant + "/" + namespace + "/" + topic;
+                String msg = String.format("topic=%s not exists in %s", fullTopic, pulsarCluster.getAdminUrl());
+                log.error(msg);
+                throw new BusinessException(msg);
+            }
 
-                // Consumer naming rules: sortAppName_topicName_consumer_group
-                String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
-                pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+            // Consumer naming rules: clusterTag_topicName_consumer_group
+            String subscription = clusterTag + "_" + topic + "_consumer_group";
+            pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
 
-                // Insert the consumption data into the consumption table
-                consumptionService.saveSortConsumption(groupInfo, topic, subscription);
-            } catch (Exception e) {
-                log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
-                throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
-            }
+            // Insert the consumption data into the consumption table
+            consumptionService.saveSortConsumption(groupInfo, topic, subscription);
         } catch (Exception e) {
-            log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
-            throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
+            log.error("failed to create pulsar subscription for groupId=" + groupId + " streamId=" + streamId, e);
+            throw new WorkflowListenerException("failed to create pulsar subscription: " + e.getMessage());
         }
 
-        log.info("finish to create single pulsar subscription for groupId={}, streamId={}", groupId, streamId);
+        log.info("success to create pulsar subscription for groupId={}, streamId={}", groupId, streamId);
         return ListenerResult.success();
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 213127961..8e39f9b48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -18,15 +18,18 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.mq.util.PulsarOptService;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -45,9 +48,7 @@ import org.springframework.stereotype.Component;
 public class CreatePulsarTopicTaskListener implements QueueOperateListener {
 
     @Autowired
-    private CommonOperateService commonOperateService;
-    @Autowired
-    private ClusterBean clusterBean;
+    private InlongClusterService clusterService;
     @Autowired
     private PulsarOptService pulsarOptService;
 
@@ -65,41 +66,37 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
         final String streamId = streamInfo.getInlongStreamId();
         log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
 
-        InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-        PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
         try {
+            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
             String pulsarTopic = streamInfo.getMqResource();
-            this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo);
+            String clusterTag = pulsarInfo.getInlongClusterTag();
+            InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+            String tenant = pulsarCluster.getTenant();
+            if (StringUtils.isEmpty(tenant)) {
+                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+            }
+
+            try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+                PulsarTopicBean topicBean = PulsarTopicBean.builder()
+                        .tenant(tenant)
+                        .namespace(pulsarInfo.getMqResource())
+                        .topicName(pulsarTopic)
+                        .queueModule(pulsarInfo.getQueueModule())
+                        .numPartitions(pulsarInfo.getPartitionNum())
+                        .build();
+                pulsarOptService.createTopic(pulsarAdmin, topicBean);
+            }
         } catch (Exception e) {
-            log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
-            throw new WorkflowListenerException(
-                    "create pulsar topic error for groupId=" + groupId + ", streamId=" + streamId);
+            String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
+            log.error(msg, e);
+            throw new WorkflowListenerException(msg);
         }
 
         log.info("success to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
         return ListenerResult.success();
     }
 
-    private void createTopic(InlongPulsarInfo groupInfo, String pulsarTopic, PulsarClusterInfo pulsarClusterInfo)
-            throws Exception {
-        Integer partitionNum = groupInfo.getPartitionNum();
-        int partition = 0;
-        if (partitionNum != null && partitionNum > 0) {
-            partition = partitionNum;
-        }
-
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
-            PulsarTopicBean topicBean = PulsarTopicBean.builder()
-                    .tenant(clusterBean.getDefaultTenant())
-                    .namespace(groupInfo.getMqResource())
-                    .topicName(pulsarTopic)
-                    .numPartitions(partition)
-                    .queueModule(groupInfo.getQueueModule())
-                    .build();
-            pulsarOptService.createTopic(pulsarAdmin, topicBean);
-        }
-    }
-
     @Override
     public boolean async() {
         return false;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
index 3e27e18f6..0acf8fac8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
@@ -19,15 +19,16 @@ package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.manager.common.beans.ReTryConfigBean;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
 import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -46,14 +47,13 @@ import java.util.Collections;
 public class CreateTubeGroupTaskListener implements QueueOperateListener {
 
     @Autowired
-    InlongGroupService groupService;
-
+    private InlongGroupEntityMapper groupMapper;
     @Autowired
-    TubeMqOptService tubeMqOptService;
+    private InlongClusterService clusterService;
     @Autowired
-    ReTryConfigBean reTryConfigBean;
+    private TubeMqOptService tubeMqOptService;
     @Autowired
-    private CommonOperateService commonOperateService;
+    private ReTryConfigBean reTryConfigBean;
 
     @Override
     public TaskEvent event() {
@@ -66,12 +66,18 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
         String groupId = form.getInlongGroupId();
         log.info("try to create consumer group for groupId {}", groupId);
 
-        InlongGroupInfo groupInfo = groupService.get(groupId);
-        String topicName = groupInfo.getMqResource();
-        int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID));
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        InlongClusterInfo tubeCluster = clusterService.getOne(groupEntity.getInlongClusterTag(),
+                null, ClusterType.CLS_TUBE);
+
+        // TODO use the original method of TubeMQ to create group
+        // TubeClusterDTO clusterDTO = TubeClusterDTO.getFromJson(clusters.get(0).getExtParams());
+        // int clusterId = clusterDTO.getClusterId();
+
+        String topicName = groupEntity.getMqResource();
         QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
-                .topicName(topicName).clusterId(clusterId)
-                .user(groupInfo.getCreator()).build();
+                .topicName(topicName).clusterId(1)
+                .user(groupEntity.getCreator()).build();
         // Query whether the tube topic exists
         boolean topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
 
@@ -89,8 +95,8 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
         }
 
         AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
-        addTubeConsumeGroupRequest.setClusterId(clusterId);
-        addTubeConsumeGroupRequest.setCreateUser(groupInfo.getCreator());
+        addTubeConsumeGroupRequest.setClusterId(1);
+        addTubeConsumeGroupRequest.setCreateUser(groupEntity.getCreator());
 
         GroupNameJsonSetBean groupNameJsonSetBean = new GroupNameJsonSetBean();
         groupNameJsonSetBean.setTopicName(topicName);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index b6bce4869..d0f459351 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -65,9 +65,8 @@ public class CreateTubeTopicTaskListener implements QueueOperateListener {
             AddTubeMqTopicRequest.AddTopicTasksBean tasksBean = new AddTubeMqTopicRequest.AddTopicTasksBean();
             tasksBean.setTopicName(topicName);
             request.setAddTopicTasks(Collections.singletonList(tasksBean));
-            tubeMqOptService.createNewTopic(request);
-
-            log.info("finish to create tube topic for groupId={}", groupId);
+            String result = tubeMqOptService.createNewTopic(request);
+            log.info("finish to create tube topic for groupId={}, result={}", groupId, result);
         } catch (Exception e) {
             log.error("create tube topic for groupId={} error, exception {} ", groupId, e.getMessage(), e);
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
index e01516903..45eba4d6b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
@@ -50,7 +50,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
 
     @Override
     public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
-        log.info("begin to create tenant={}", tenant);
+        log.info("begin to create pulsar tenant={}", tenant);
 
         Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
         try {
@@ -66,7 +66,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
             pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
             log.info("success to create pulsar tenant={}", tenant);
         } catch (PulsarAdminException e) {
-            log.error("create pulsar tenant={} failed", tenant, e);
+            log.error("failed to create pulsar tenant=" + tenant, e);
             throw e;
         }
     }
@@ -119,9 +119,9 @@ public class PulsarOptServiceImpl implements PulsarOptService {
             PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(),
                     pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), 0);
             namespaces.setPersistence(namespaceName, persistencePolicies);
-            log.info("success to create namespace={}", tenant);
+            log.info("success to create namespace={}", namespaceName);
         } catch (PulsarAdminException e) {
-            log.error("create namespace={} error", tenant, e);
+            log.error("failed to create namespace=" + namespaceName, e);
             throw e;
         }
     }
@@ -159,7 +159,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
 
             log.info("success to create topic={}", topicFullName);
         } catch (Exception e) {
-            log.error("create topic={} failed", topicFullName, e);
+            log.error("failed to create topic=" + topicFullName, e);
             throw e;
         }
     }
@@ -182,7 +182,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
                 log.warn("pulsar subscription={} already exists, skip to create", subscription);
             }
         } catch (Exception e) {
-            log.error("create pulsar subscription={} failed", subscription, e);
+            log.error("failed to create pulsar subscription=" + subscription, e);
             throw e;
         }
     }
@@ -243,7 +243,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
             List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
             return subscriptionList.contains(subscription);
         } catch (PulsarAdminException e) {
-            log.error("check if the topic={} is exists error,", topic, e);
+            log.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
             return false;
         }
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
index ddb11af17..be8853837 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
@@ -19,8 +19,7 @@ package org.apache.inlong.manager.service.mq.util;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -41,37 +40,36 @@ public class PulsarUtils {
     /**
      * Get pulsar admin info
      */
-    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo) throws PulsarClientException {
-        Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "Pulsar adminUrl cannot be empty");
+    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) throws PulsarClientException {
+        Preconditions.checkNotNull(pulsarCluster.getAdminUrl(), "Pulsar adminUrl cannot be empty");
         PulsarAdmin pulsarAdmin;
-        if (StringUtils.isEmpty(pulsarClusterInfo.getToken())) {
-            pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl());
+        if (StringUtils.isEmpty(pulsarCluster.getToken())) {
+            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
         } else {
-            pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getToken(),
-                    InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE);
+            pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl(), pulsarCluster.getToken());
         }
         return pulsarAdmin;
     }
 
     /**
-     * Obtain the PulsarAdmin client according to the service URL, and it must be closed after use
+     * Get the pulsar admin from the given service URL.
+     *
+     * @apiNote It must be closed after use.
      */
     public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws PulsarClientException {
         return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
     }
 
     /**
-     * Get pulsar admin info.
+     * Get the pulsar admin from the given service URL and token.
+     * <p/>
+     * Currently only token is supported as an authentication type.
+     *
+     * @apiNote It must be closed after use.
      */
-    private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String authentication, String authenticationType)
-            throws PulsarClientException {
-        if (InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE.equals(authenticationType)) {
-            return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
-                    .authentication(AuthenticationFactory.token(authentication)).build();
-        } else {
-            throw new IllegalArgumentException(
-                    String.format("illegal authentication type for pulsar : %s", authenticationType));
-        }
+    private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String token) throws PulsarClientException {
+        return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
+                .authentication(AuthenticationFactory.token(token)).build();
     }
 
     /**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
index 57d0bd053..3774aba4a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
@@ -26,9 +26,8 @@ import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
 import org.apache.inlong.manager.common.pojo.tubemq.TubeManagerResponse;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.HttpUtils;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpMethod;
@@ -45,7 +44,7 @@ public class TubeMqOptService {
     private static final Gson GSON = new GsonBuilder().create(); // thread safe
 
     @Autowired
-    private CommonOperateService commonOperateService;
+    private InlongClusterEntityMapper clusterMapper;
     @Autowired
     private HttpUtils httpUtils;
 
@@ -59,14 +58,14 @@ public class TubeMqOptService {
             if (CollectionUtils.isEmpty(request.getAddTopicTasks())) {
                 throw new Exception("topic cannot be empty");
             }
+
+            // TODO use the original method of TubeMQ to create group
             AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
-            String clusterIdStr = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID);
-            int clusterId = Integer.parseInt(clusterIdStr);
             QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
-                    .topicName(addTopicTasksBean.getTopicName()).clusterId(clusterId)
+                    .topicName(addTopicTasksBean.getTopicName()).clusterId(1)
                     .user(request.getUser()).build();
 
-            String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
+            String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
             TubeManagerResponse response = httpUtils
                     .request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
                             GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
@@ -74,15 +73,16 @@ public class TubeMqOptService {
                 log.info(" create tube topic  {}  on {} ", GSON.toJson(request),
                         tubeManager + "/v1/task?method=addTopicTask");
 
-                request.setClusterId(clusterId);
+                request.setClusterId(1);
                 TubeManagerResponse createRsp = httpUtils
                         .request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
                                 GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
+                log.info("create tube topic success, result: {}", createRsp);
             } else {
-                log.warn("topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
+                log.warn("tube topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
             }
         } catch (Exception e) {
-            log.error("fail to create tube topic " + request.getAddTopicTasks().get(0).getTopicName(), e);
+            log.error("failed to create tube topic: " + request.getAddTopicTasks().get(0).getTopicName(), e);
         }
         return "";
     }
@@ -94,16 +94,17 @@ public class TubeMqOptService {
         HttpHeaders httpHeaders = new HttpHeaders();
         httpHeaders.add("Content-Type", "application/json");
         try {
-            String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
+            String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
             log.info("create tube consumer group {}  on {} ", GSON.toJson(request),
                     tubeManager + "/v1/task?method=addTopicTask");
-            TubeManagerResponse rsp = httpUtils.request(tubeManager + "/v1/group?method=add",
+            TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/group?method=add",
                     HttpMethod.POST, GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
-            if (rsp.getErrCode() == -1) { // Creation failed
-                throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, rsp.getErrMsg());
+            if (response.getErrCode() == -1) { // Creation failed
+                throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, response.getErrMsg());
             }
+            log.info("create tube consumer group success, result: {}", response);
         } catch (BusinessException e) {
-            log.error(" fail to create tube consumer group  " + GSON.toJson(request), e);
+            log.error("failed to create tube consumer group: " + GSON.toJson(request), e);
             throw e;
         }
         return "";
@@ -116,7 +117,7 @@ public class TubeMqOptService {
         HttpHeaders httpHeaders = new HttpHeaders();
         httpHeaders.add("Content-Type", "application/json");
         try {
-            String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
+            String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
             TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
                     HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
             if (response.getErrCode() == 0) { // topic already exists
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 026461985..455b87845 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -41,7 +41,7 @@ import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -66,7 +66,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
     @Autowired
     private SinkOperationFactory operationFactory;
     @Autowired
-    private CommonOperateService commonOperateService;
+    private GroupCheckService groupCheckService;
     @Autowired
     private StreamSinkEntityMapper sinkMapper;
     @Autowired
@@ -80,7 +80,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
 
         // Check if it can be added
         String groupId = request.getInlongGroupId();
-        commonOperateService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
 
         // Make sure that there is no sink info with the current groupId and streamId
         String streamId = request.getInlongStreamId();
@@ -192,7 +192,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         String streamId = request.getInlongStreamId();
         String sinkName = request.getSinkName();
         String sinkType = request.getSinkType();
-        InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+        InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
 
         // Check whether the sink name exists with the same groupId and streamId
         List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
@@ -239,7 +239,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
         StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
-        commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         entity.setPreviousStatus(entity.getStatus());
         entity.setStatus(GlobalConstants.DELETED_STATUS);
@@ -261,7 +261,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
-        commonOperateService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
 
         Date now = new Date();
         List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
@@ -290,7 +290,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
-        commonOperateService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
 
         List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
         if (CollectionUtils.isNotEmpty(entityList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 80678d19e..6f0f5c28a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -23,11 +23,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -37,7 +39,7 @@ import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
 import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
@@ -68,12 +70,10 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
 
     @Autowired
     private StreamSourceService sourceService;
-
     @Autowired
     private StreamSinkService sinkService;
-
     @Autowired
-    private CommonOperateService commonOperateService;
+    private InlongClusterService clusterService;
 
     @Override
     public TaskEvent event() {
@@ -125,26 +125,33 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
         return new GroupInfo(groupId, streamInfos);
     }
 
-    private Map<String, List<StreamSource>> createPulsarSources(InlongGroupInfo groupInfo,
-            List<InlongStreamInfo> streamInfoList) {
-        MQType mqType = MQType.forType(groupInfo.getMqType());
-        if (mqType != MQType.PULSAR) {
-            String errMsg = String.format("Unsupported mqType={%s}", mqType);
+    private Map<String, List<StreamSource>> createPulsarSources(
+            InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+
+        if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
+            String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
             log.error(errMsg);
             throw new WorkflowListenerException(errMsg);
         }
+
         Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
-        PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+        InlongClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+                ClusterType.CLS_PULSAR);
+
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+        String adminUrl = pulsarCluster.getAdminUrl();
+        String serviceUrl = pulsarCluster.getUrl();
         streamInfoList.forEach(streamInfo -> {
             PulsarSource pulsarSource = new PulsarSource();
-            pulsarSource.setSourceName(streamInfo.getInlongStreamId());
+            String streamId = streamInfo.getInlongStreamId();
+            pulsarSource.setSourceName(streamId);
             pulsarSource.setNamespace(groupInfo.getMqResource());
             pulsarSource.setTopic(streamInfo.getMqResource());
-            pulsarSource.setAdminUrl(pulsarCluster.getAdminUrl());
-            pulsarSource.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+            pulsarSource.setAdminUrl(adminUrl);
+            pulsarSource.setServiceUrl(serviceUrl);
             pulsarSource.setInlongComponent(true);
-            List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(),
-                    streamInfo.getInlongStreamId());
+
+            List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(), streamId);
             for (StreamSource sourceInfo : sourceInfos) {
                 if (StringUtils.isEmpty(pulsarSource.getSerializationType())
                         && StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
@@ -159,8 +166,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
             }
             pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());
-            sourceMap.computeIfAbsent(streamInfo.getInlongStreamId(), key -> Lists.newArrayList())
-                    .add(pulsarSource);
+            sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
         });
         return sourceMap;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 02e848de2..daaf55050 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -22,11 +22,13 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupOperateType;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
 import org.apache.inlong.manager.common.pojo.source.StreamSource;
@@ -36,7 +38,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
 import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
@@ -70,7 +72,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
     @Autowired
     private StreamSinkService streamSinkService;
     @Autowired
-    private CommonOperateService commonOperateService;
+    private InlongClusterService clusterService;
 
     @Override
     public TaskEvent event() {
@@ -119,22 +121,28 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
     }
 
     private List<StreamSource> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
-        MQType mqType = MQType.forType(groupInfo.getMqType());
-        if (mqType != MQType.PULSAR) {
-            String errMsg = String.format("Unsupported mqType={%s}", mqType);
+        if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
+            String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
             log.error(errMsg);
             throw new WorkflowListenerException(errMsg);
         }
-        PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+
         PulsarSource pulsarSource = new PulsarSource();
-        pulsarSource.setSourceName(streamInfo.getInlongStreamId());
+        String streamId = streamInfo.getInlongStreamId();
+        pulsarSource.setSourceName(streamId);
         pulsarSource.setNamespace(groupInfo.getMqResource());
         pulsarSource.setTopic(streamInfo.getMqResource());
-        pulsarSource.setAdminUrl(pulsarCluster.getAdminUrl());
-        pulsarSource.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+
+        InlongClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+                ClusterType.CLS_PULSAR);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+        String adminUrl = pulsarCluster.getAdminUrl();
+        String serviceUrl = pulsarCluster.getUrl();
+
+        pulsarSource.setAdminUrl(adminUrl);
+        pulsarSource.setServiceUrl(serviceUrl);
         pulsarSource.setInlongComponent(true);
-        List<StreamSource> sources = streamSourceService.listSource(groupInfo.getInlongGroupId(),
-                streamInfo.getInlongStreamId());
+        List<StreamSource> sources = streamSourceService.listSource(groupInfo.getInlongGroupId(), streamId);
         for (StreamSource source : sources) {
             if (StringUtils.isEmpty(pulsarSource.getSerializationType())
                     && StringUtils.isNotEmpty(source.getSerializationType())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
index b113370f7..a24a9264f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.sort;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -47,8 +46,6 @@ public class PushSortConfigListener implements SortOperateListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(PushSortConfigListener.class);
 
-    @Autowired
-    private ClusterBean clusterBean;
     @Autowired
     private InlongGroupService groupService;
     @Autowired
@@ -89,10 +86,10 @@ public class PushSortConfigListener implements SortOperateListener {
             Integer sinkId = streamSink.getId();
             try {
                 // DataFlowInfo dataFlowInfo = dataFlowUtils.createDataFlow(groupInfo, streamSink);
-                String zkUrl = clusterBean.getZkUrl();
-                String zkRoot = clusterBean.getZkRoot();
+                // String zkUrl = clusterBean.getZkUrl();
+                // String zkRoot = clusterBean.getZkRoot();
                 // push data flow info to zk
-                String sortClusterName = clusterBean.getAppName();
+                // String sortClusterName = clusterBean.getAppName();
                 // ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName, sinkId, zkUrl, zkRoot);
                 // add sink id to zk
                 // ZkTools.addDataFlowToCluster(sortClusterName, sinkId, zkUrl, zkRoot);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
index 2f54a94ab..51767692b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
@@ -17,9 +17,8 @@
 
 package org.apache.inlong.manager.service.sort.util;
 
-import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.service.CommonOperateService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
 import org.apache.inlong.manager.service.source.StreamSourceService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
@@ -32,9 +31,7 @@ import org.springframework.stereotype.Service;
 public class DataFlowUtils {
 
     @Autowired
-    private ClusterBean clusterBean;
-    @Autowired
-    private CommonOperateService commonOperateService;
+    private GroupCheckService groupCheckService;
     @Autowired
     private StreamSourceService streamSourceService;
     @Autowired
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b3993d071..0a375f909 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -40,7 +40,7 @@ import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -70,7 +70,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
     @Autowired
     private StreamSourceFieldEntityMapper sourceFieldMapper;
     @Autowired
-    private CommonOperateService commonOperateService;
+    private GroupCheckService groupCheckService;
 
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
@@ -80,7 +80,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         // Check if it can be added
         String groupId = request.getInlongGroupId();
-        InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+        InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
 
         // According to the source type, save source information
         String sourceType = request.getSourceType();
@@ -169,7 +169,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         // Check if it can be modified
         String groupId = request.getInlongGroupId();
-        InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+        InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
 
         String sourceType = request.getSourceType();
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
@@ -203,7 +203,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
 
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-        commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
         SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
@@ -235,7 +235,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.info("begin to restart source by id={}", id);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-        commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
         SourceRequest sourceRequest = new SourceRequest();
@@ -253,7 +253,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         LOGGER.info("begin to stop source by id={}", id);
         StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
         Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
-        commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+        groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
 
         StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
         SourceRequest sourceRequest = new SourceRequest();
@@ -272,7 +272,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
-        InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+        InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
         Integer nextStatus;
         if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
             nextStatus = SourceStatus.TO_BE_ISSUED_DELETE.getCode();
@@ -307,7 +307,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
 
         // Check if it can be deleted
-        commonOperateService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
         sourceMapper.deleteByRelatedId(groupId, streamId);
         LOGGER.info("success to delete all source by groupId={}, streamId={}", groupId, streamId);
         return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 7aacda685..9865d9660 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
 import org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity;
 import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
 import org.apache.inlong.manager.dao.mapper.StreamTransformFieldEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Propagation;
@@ -58,7 +58,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
     @Autowired
     protected StreamTransformFieldEntityMapper transformFieldMapper;
     @Autowired
-    protected CommonOperateService commonOperateService;
+    protected GroupCheckService groupCheckService;
 
     @Override
     @Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
@@ -70,7 +70,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         final String groupId = transformRequest.getInlongGroupId();
         final String streamId = transformRequest.getInlongStreamId();
         final String transformName = transformRequest.getTransformName();
-        commonOperateService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
 
         List<StreamTransformEntity> transformEntities = transformMapper.selectByRelatedId(groupId,
                 streamId, transformName);
@@ -131,7 +131,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
         this.checkParams(transformRequest);
         // Check whether the transform can be modified
         String groupId = transformRequest.getInlongGroupId();
-        commonOperateService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
         Preconditions.checkNotNull(transformRequest.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
         StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
                 StreamTransformEntity::new);
@@ -151,7 +151,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
                 transformName);
         Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
         Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
-        commonOperateService.checkGroupStatus(groupId, operator);
+        groupCheckService.checkGroupStatus(groupId, operator);
         Date now = new Date();
         List<StreamTransformEntity> entityList = transformMapper.selectByRelatedId(groupId, streamId, transformName);
         if (CollectionUtils.isNotEmpty(entityList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 3ae77c41e..df8ddad08 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -18,23 +18,26 @@
 package org.apache.inlong.manager.service.workflow.consumption.listener;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
 import org.apache.inlong.manager.service.mq.util.PulsarOptService;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -58,9 +61,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
     @Autowired
     private PulsarOptService pulsarMqOptService;
     @Autowired
-    private ClusterBean clusterBean;
-    @Autowired
-    private CommonOperateService commonOperateService;
+    private InlongClusterService clusterService;
     @Autowired
     private InlongGroupService groupService;
     @Autowired
@@ -89,7 +90,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
             this.createTubeConsumerGroup(entity);
             return ListenerResult.success("Create Tube consumer group successful");
         } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
-            this.createPulsarTopicMessage(entity);
+            this.createPulsarSubscription(entity);
         } else {
             throw new WorkflowListenerException("Unsupported MQ type [" + mqType + "]");
         }
@@ -111,18 +112,24 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
     }
 
     /**
-     * Create Pulsar consumption information
+     * Create Pulsar subscription
      */
-    private void createPulsarTopicMessage(ConsumptionEntity entity) {
+    private void createPulsarSubscription(ConsumptionEntity entity) {
         String groupId = entity.getInlongGroupId();
         InlongGroupInfo groupInfo = groupService.get(groupId);
         Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
         String mqResource = groupInfo.getMqResource();
         Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
-        PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(entity.getMqType());
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+
+        String clusterTag = groupInfo.getInlongClusterTag();
+        InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
             PulsarTopicBean topicMessage = new PulsarTopicBean();
-            String tenant = clusterBean.getDefaultTenant();
+            String tenant = pulsarCluster.getTenant();
+            if (StringUtils.isEmpty(tenant)) {
+                tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+            }
             topicMessage.setTenant(tenant);
             topicMessage.setNamespace(mqResource);