You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/07 12:08:46 UTC

[incubator-inlong] branch master updated: [INLONG-2973][Manager] Get pulsar info from the cluster table (#2978)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d208304  [INLONG-2973][Manager] Get pulsar info from the cluster table (#2978)
d208304 is described below

commit d208304c9334d1d2ad5205cd5cb815f40cbe6ad2
Author: pacino <ge...@gmail.com>
AuthorDate: Mon Mar 7 20:08:20 2022 +0800

    [INLONG-2973][Manager] Get pulsar info from the cluster table (#2978)
---
 .../common/pojo/dataproxy/PulsarClusterInfo.java   | 40 ++++++-------------
 .../inlong/manager/client/api/InlongGroupConf.java |  3 ++
 .../client/api/util/InlongGroupTransfer.java       |  9 +++--
 .../manager/common/pojo/group/InlongGroupInfo.java | 10 +++--
 .../common/pojo/group/InlongGroupRequest.java      |  3 ++
 .../common/pojo/group/InlongGroupResponse.java     | 10 +++--
 .../resources/mappers/InlongGroupEntityMapper.xml  | 15 ++++++-
 .../manager/service/CommonOperateService.java      | 20 ++++++++++
 .../service/core/impl/InlongGroupServiceImpl.java  |  6 ++-
 .../mq/CreatePulsarGroupForStreamTaskListener.java | 14 ++++---
 .../mq/CreatePulsarGroupTaskListener.java          | 17 ++++----
 .../mq/CreatePulsarResourceTaskListener.java       | 23 ++++++-----
 .../mq/CreatePulsarTopicForStreamTaskListener.java | 18 +++++----
 .../service/thirdparty/mq/util/PulsarUtils.java    | 46 +++++++---------------
 .../thirdparty/sort/CreateSortConfigListener.java  | 15 +++----
 .../thirdparty/sort/PushSortConfigListener.java    | 13 +++---
 .../thirdparty/sort/util/SourceInfoUtils.java      | 44 +++++----------------
 .../ConsumptionCompleteProcessListener.java        | 23 ++++++-----
 .../thirdparty/mq/util/PulsarUtilsTest.java        | 11 +++---
 19 files changed, 175 insertions(+), 165 deletions(-)

diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
index a942575..f72d6c7 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
@@ -17,35 +17,21 @@
 
 package org.apache.inlong.common.pojo.dataproxy;
 
-import java.util.HashMap;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
 import java.util.Map;
 
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+@Builder
 public class PulsarClusterInfo {
-    private String url;
-    private String token;
-    private Map<String, String> params = new HashMap<>();
-
-    public String getUrl() {
-        return url;
-    }
-
-    public void setUrl(String url) {
-        this.url = url;
-    }
 
-    public String getToken() {
-        return token;
-    }
-
-    public void setToken(String token) {
-        this.token = token;
-    }
-
-    public Map<String, String> getParams() {
-        return params;
-    }
-
-    public void setParams(Map<String, String> params) {
-        this.params = params;
-    }
+    private String adminUrl;
+    private String token;
+    private String brokerServiceUrl;
+    private Map<String, String> ext;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
index 1fbecb9..4ab7ff8 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
@@ -58,4 +58,7 @@ public class InlongGroupConf {
 
     @ApiModelProperty("Need zookeeper support")
     private boolean zookeeperEnabled = true;
+
+    @ApiModelProperty("data proxy cluster id")
+    private Integer proxyClusterId;
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 6bdfa6b..8e948f5 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -19,9 +19,6 @@ package org.apache.inlong.manager.client.api.util;
 
 import com.google.common.collect.Lists;
 import com.google.gson.reflect.TypeToken;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -46,6 +43,10 @@ import org.apache.inlong.manager.common.pojo.group.InlongGroupResponse;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.JsonUtils;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 public class InlongGroupTransfer {
 
     public static InlongGroupConf parseGroupResponse(InlongGroupResponse groupResponse) {
@@ -58,6 +59,7 @@ public class InlongGroupTransfer {
         inlongGroupConf.setPeakRecords(Long.valueOf(groupResponse.getPeakRecords()));
         inlongGroupConf.setMqBaseConf(parseMqBaseConf(groupResponse));
         inlongGroupConf.setSortBaseConf(parseSortBaseConf(groupResponse));
+        inlongGroupConf.setProxyClusterId(groupResponse.getProxyClusterId());
         return inlongGroupConf;
     }
 
@@ -187,6 +189,7 @@ public class InlongGroupTransfer {
         groupInfo.setDailyRecords(groupConf.getDailyRecords().intValue());
         groupInfo.setPeakRecords(groupConf.getPeakRecords().intValue());
         groupInfo.setMaxLength(groupConf.getMaxLength());
+        groupInfo.setProxyClusterId(groupConf.getProxyClusterId());
         MqBaseConf mqConf = groupConf.getMqBaseConf();
         MqType mqType = mqConf.getType();
         groupInfo.setMiddlewareType(mqType.name());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java
index a9da378..70e58ad 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupInfo.java
@@ -20,15 +20,16 @@ package org.apache.inlong.manager.common.pojo.group;
 import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import java.util.List;
-import javax.validation.constraints.NotNull;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.util.CommonBeanUtils;
 
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+import java.util.List;
+
 /**
  * Inlong group info
  */
@@ -117,6 +118,9 @@ public class InlongGroupInfo {
     @ApiModelProperty(value = "Temporary view, string in JSON format")
     private String tempView;
 
+    @ApiModelProperty(value = "data proxy cluster id")
+    private Integer proxyClusterId;
+
     @ApiModelProperty(value = "Inlong group Extension properties")
     private List<InlongGroupExtInfo> extList;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
index 3ca513a..8909e36 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupRequest.java
@@ -117,6 +117,9 @@ public class InlongGroupRequest {
     @ApiModelProperty(value = "Temporary view, string in JSON format")
     private String tempView;
 
+    @ApiModelProperty(value = "data proxy cluster id")
+    private Integer proxyClusterId;
+
     @ApiModelProperty(value = "Inlong group Extension properties")
     private List<InlongGroupExtInfo> extList;
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java
index d771e56..e4f4313 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupResponse.java
@@ -20,14 +20,15 @@ package org.apache.inlong.manager.common.pojo.group;
 import com.fasterxml.jackson.annotation.JsonFormat;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import java.util.Date;
-import java.util.List;
-import javax.validation.constraints.NotNull;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import javax.validation.constraints.NotNull;
+import java.util.Date;
+import java.util.List;
+
 @Data
 @Builder
 @NoArgsConstructor
@@ -113,6 +114,9 @@ public class InlongGroupResponse {
     @ApiModelProperty(value = "Temporary view, string in JSON format")
     private String tempView;
 
+    @ApiModelProperty(value = "data proxy cluster Id")
+    private Integer proxyClusterId;
+
     @ApiModelProperty(value = "Inlong group Extension properties")
     private List<InlongGroupExtInfo> extList;
 
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index 88979c0..4999d54 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -54,7 +54,8 @@
     </resultMap>
 
     <sql id="Base_Column_List">
-        id, inlong_group_id, name, cn_name, description, middleware_type, queue_module, topic_partition_num,
+        id
+        , inlong_group_id, name, cn_name, description, middleware_type, queue_module, topic_partition_num,
         mq_resource_obj, daily_records, daily_storage, peak_records, max_length, schema_name, in_charges, followers,
         status, is_deleted, creator, modifier, create_time, modify_time, temp_view, zookeeper_enabled, proxy_cluster_id
     </sql>
@@ -231,6 +232,9 @@
             <if test="zookeeperEnabled != null">
                 zookeeper_enabled,
             </if>
+            <if test="proxyClusterId != null">
+                proxy_cluster_id,
+            </if>
         </trim>
         <trim prefix="values (" suffix=")" suffixOverrides=",">
             <if test="id != null">
@@ -305,6 +309,9 @@
             <if test="zookeeperEnabled != null">
                 #{zookeeperEnabled,jdbcType=INTEGER},
             </if>
+            <if test="proxyClusterId != null">
+                #{proxyClusterId,jdbcType=INTEGER},
+            </if>
         </trim>
     </insert>
     <update id="updateByPrimaryKeySelective" parameterType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
@@ -379,6 +386,9 @@
             <if test="zookeeperEnabled != null">
                 zookeeper_enabled = #{zookeeperEnabled,jdbcType=INTEGER},
             </if>
+            <if test="proxyClusterId != null">
+                proxy_cluster_id = #{proxyClusterId,jdbcType=INTEGER},
+            </if>
         </set>
         where id = #{id,jdbcType=INTEGER}
     </update>
@@ -445,6 +455,9 @@
             <if test="zookeeperEnabled != null">
                 zookeeper_enabled = #{zookeeperEnabled,jdbcType=INTEGER},
             </if>
+            <if test="proxyClusterId != null">
+                proxy_cluster_id = #{proxyClusterId,jdbcType=INTEGER},
+            </if>
         </set>
         where inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
         and is_deleted = 0
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index d5326e5..345f9b8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -18,12 +18,14 @@
 package org.apache.inlong.manager.service;
 
 import com.google.gson.Gson;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.common.util.JsonUtils;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
@@ -139,6 +141,24 @@ public class CommonOperateService {
     }
 
     /**
+     * get pulsar cluster info
+     *
+     * @return
+     */
+    public PulsarClusterInfo getPulsarClusterInfo() {
+        ThirdPartyClusterEntity thirdPartyClusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
+        Preconditions.checkNotNull(thirdPartyClusterEntity.getExtParams(), "pulsar extParam is empty, check"
+                + "third party cluster table");
+        Map<String, String> configParams = JsonUtils.parse(thirdPartyClusterEntity.getExtParams(), Map.class);
+        PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl(
+                thirdPartyClusterEntity.getUrl()).token(thirdPartyClusterEntity.getToken()).build();
+        String adminUrl = configParams.get(Constant.PULSAR_ADMINURL);
+        Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table");
+        pulsarClusterInfo.setAdminUrl(adminUrl);
+        return pulsarClusterInfo;
+    }
+
+    /**
      * Check whether the inlong group status is temporary
      *
      * @param groupId Inlong group id
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index 3bdd112..5289333 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -22,6 +22,7 @@ import com.github.pagehelper.PageHelper;
 import com.github.pagehelper.PageInfo;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -184,8 +185,9 @@ public class InlongGroupServiceImpl implements InlongGroupService {
             if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
                 groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
             } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
-                groupInfo.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
-                groupInfo.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL));
+                PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
+                groupInfo.setPulsarAdminUrl(pulsarClusterInfo.getAdminUrl());
+                groupInfo.setPulsarServiceUrl(pulsarClusterInfo.getBrokerServiceUrl());
             }
         }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
index f448f9e..a814379 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -17,10 +17,9 @@
 
 package org.apache.inlong.manager.service.thirdparty.mq;
 
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -41,6 +40,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * Create a subscription group for a single inlong stream
  */
@@ -85,9 +86,8 @@ public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListe
             log.warn("inlong stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId);
             return ListenerResult.success();
         }
-
-        try (PulsarAdmin globalPulsarAdmin = PulsarUtils
-                .getPulsarAdmin(groupInfo, commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             // Query data sink info based on groupId and streamId
             List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
             if (sinkTypeList == null || sinkTypeList.size() == 0) {
@@ -108,7 +108,9 @@ public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListe
             String namespace = groupInfo.getMqResourceObj();
             for (String cluster : pulsarClusters) {
                 String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
-                try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo, serviceUrl)) {
+                PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+                        .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+                try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
                     boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
                     if (!exist) {
                         String fullTopic = tenant + "/" + namespace + "/" + topic;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
index 51b7355..304dc78 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
@@ -18,8 +18,8 @@
 package org.apache.inlong.manager.service.thirdparty.mq;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -82,9 +82,8 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
             log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
             return ListenerResult.success();
         }
-
-        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(bizInfo,
-                commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             String tenant = clusterBean.getDefaultTenant();
             String namespace = bizInfo.getMqResourceObj();
 
@@ -98,14 +97,16 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
 
                 // Create a subscription in the Pulsar cluster (cross-region), you need to ensure that the Topic exists
                 for (String cluster : pulsarClusters) {
-                    String url = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
-                    try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(bizInfo, url)) {
+                    String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+                    PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+                            .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+                    try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
                         boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
 
                         if (!exist) {
                             String topicFull = tenant + "/" + namespace + "/" + topic;
-                            log.error("topic={} not exists in {}", topicFull, url);
-                            throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + url);
+                            log.error("topic={} not exists in {}", topicFull, serviceUrl);
+                            throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
                         }
 
                         // Consumer naming rules: sortAppName_topicName_consumer_group
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
index 1ad4b56..9cb3d88 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
@@ -17,10 +17,9 @@
 
 package org.apache.inlong.manager.service.thirdparty.mq;
 
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -41,6 +40,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * Create Pulsar tenant, namespace and topic
  */
@@ -76,13 +77,14 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
         if (groupInfo == null) {
             throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
         }
-
-        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo,
-                commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
             for (String cluster : pulsarClusters) {
                 String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
-                this.createPulsarProcess(groupInfo, serviceUrl);
+                PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+                        .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+                this.createPulsarProcess(groupInfo, pulsarClusterInfo);
             }
         } catch (Exception e) {
             log.error("create pulsar resource error for groupId={}", groupId, e);
@@ -96,9 +98,9 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
     /**
      * Create Pulsar tenant, namespace and topic
      */
-    private void createPulsarProcess(InlongGroupInfo groupInfo, String serviceHttpUrl) throws Exception {
+    private void createPulsarProcess(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarClusterInfo) throws Exception {
         String groupId = groupInfo.getInlongGroupId();
-        log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, serviceHttpUrl);
+        log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, pulsarClusterInfo);
 
         String namespace = groupInfo.getMqResourceObj();
         Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty for groupId=" + groupId);
@@ -106,7 +108,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
         Preconditions.checkNotNull(queueModule, "queue module cannot be empty for groupId=" + groupId);
 
         String tenant = clusterBean.getDefaultTenant();
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo, serviceHttpUrl)) {
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
             // create pulsar tenant
             pulsarOptService.createTenant(pulsarAdmin, tenant);
 
@@ -125,7 +127,8 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
                 pulsarOptService.createTopic(pulsarAdmin, topicBean);
             }
         }
-        log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId, serviceHttpUrl);
+        log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId,
+                pulsarClusterInfo.getAdminUrl());
     }
 
     @Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
index 5cc4783..3864ac2 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -17,10 +17,9 @@
 
 package org.apache.inlong.manager.service.thirdparty.mq;
 
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -38,6 +37,8 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
+
 /**
  * Create task listener for Pulsar Topic
  */
@@ -75,13 +76,15 @@ public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListe
         }
 
         log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
-        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo,
-                commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
             for (String cluster : pulsarClusters) {
                 String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
+                PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+                        .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
                 String pulsarTopic = streamEntity.getMqResourceObj();
-                this.createTopic(groupInfo, pulsarTopic, serviceUrl);
+                this.createTopic(groupInfo, pulsarTopic, pulsarClusterInfo);
             }
         } catch (Exception e) {
             log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
@@ -93,14 +96,15 @@ public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListe
         return ListenerResult.success();
     }
 
-    private void createTopic(InlongGroupInfo bizInfo, String pulsarTopic, String serviceHttpUrl) throws Exception {
+    private void createTopic(InlongGroupInfo bizInfo, String pulsarTopic, PulsarClusterInfo pulsarClusterInfo)
+            throws Exception {
         Integer partitionNum = bizInfo.getTopicPartitionNum();
         int partition = 0;
         if (partitionNum != null && partitionNum > 0) {
             partition = partitionNum;
         }
 
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(bizInfo, serviceHttpUrl)) {
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
             PulsarTopicBean topicBean = PulsarTopicBean.builder()
                     .tenant(clusterBean.getDefaultTenant())
                     .namespace(bizInfo.getMqResourceObj())
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java
index 02ee046..12ab78a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtils.java
@@ -17,18 +17,18 @@
 
 package org.apache.inlong.manager.service.thirdparty.mq.util;
 
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
 
+import java.util.List;
+
 /**
  * Pulsar connection utils
  */
@@ -41,36 +41,18 @@ public class PulsarUtils {
     /**
      * Get pulsar admin info
      */
-    public static PulsarAdmin getPulsarAdmin(InlongGroupInfo groupInfo, String defaultServiceUrl)
+    public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo)
             throws PulsarClientException {
-        if (CollectionUtils.isEmpty(groupInfo.getExtList())) {
-            return getPulsarAdmin(defaultServiceUrl);
-        }
-        List<InlongGroupExtInfo> groupExtInfoList = groupInfo.getExtList();
-        String pulsarServiceUrl = null;
-        String pulsarAuthentication = null;
-        String pulsarAuthenticationType = InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE;
-        for (InlongGroupExtInfo extInfo : groupExtInfoList) {
-            if (InlongGroupSettings.PULSAR_ADMIN_URL.equals(extInfo.getKeyName())
-                    && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
-                pulsarServiceUrl = extInfo.getKeyValue();
-            }
-            if (InlongGroupSettings.PULSAR_AUTHENTICATION_TYPE.equals(extInfo.getKeyName())
-                    && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
-                pulsarAuthenticationType = extInfo.getKeyValue();
-            }
-            if (InlongGroupSettings.PULSAR_AUTHENTICATION.equals(extInfo.getKeyName())
-                    && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
-                pulsarAuthentication = extInfo.getKeyValue();
-            }
-        }
-        if (StringUtils.isEmpty(pulsarServiceUrl) && StringUtils.isEmpty(pulsarAuthentication)) {
-            return getPulsarAdmin(defaultServiceUrl);
-        } else if (StringUtils.isEmpty(pulsarAuthentication)) {
-            return getPulsarAdmin(pulsarServiceUrl);
+        Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "pulsar adminUrl is empty, "
+                + "check third party cluster table");
+        PulsarAdmin pulsarAdmin;
+        if (StringUtils.isEmpty(pulsarClusterInfo.getToken())) {
+            pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl());
         } else {
-            return getPulsarAdmin(pulsarServiceUrl, pulsarAuthentication, pulsarAuthenticationType);
+            pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getToken(),
+                    InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE);
         }
+        return pulsarAdmin;
     }
 
     /**
@@ -80,7 +62,7 @@ public class PulsarUtils {
         return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
     }
 
-    public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String authentication, String authenticationType)
+    private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String authentication, String authenticationType)
             throws PulsarClientException {
         if (InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE.equals(authenticationType)) {
             return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
index 2e4e227..075be69 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/CreateSortConfigListener.java
@@ -18,15 +18,12 @@
 package org.apache.inlong.manager.service.thirdparty.sort;
 
 import com.google.common.collect.Lists;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
@@ -67,6 +64,11 @@ import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 @Slf4j
 @Component
 public class CreateSortConfigListener implements SortOperateListener {
@@ -199,10 +201,9 @@ public class CreateSortConfigListener implements SortOperateListener {
             DeserializationInfo deserializationInfo,
             List<FieldInfo> fieldInfos) {
         String topicName = streamInfo.getMqResourceObj();
-        String pulsarAdminUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL);
-        String pulsarServiceUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL);
+        PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
         return SourceInfoUtils.createPulsarSourceInfo(groupInfo, topicName, deserializationInfo,
-                fieldInfos, clusterBean.getAppName(), clusterBean.getDefaultTenant(), pulsarAdminUrl, pulsarServiceUrl);
+                fieldInfos, clusterBean.getAppName(), clusterBean.getDefaultTenant(), pulsarClusterInfo);
     }
 
     private TubeSourceInfo createTubeSourceInfo(InlongGroupInfo groupInfo,
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
index a7606aa..5c7abe0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/PushSortConfigListener.java
@@ -17,9 +17,8 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
-import java.util.ArrayList;
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
@@ -56,6 +55,9 @@ import org.apache.inlong.sort.protocol.source.TubeSourceInfo;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.ArrayList;
+import java.util.List;
+
 @Slf4j
 @Component
 public class PushSortConfigListener implements SortOperateListener {
@@ -149,7 +151,7 @@ public class PushSortConfigListener implements SortOperateListener {
      * Get source info
      */
     private SourceInfo getSourceInfo(InlongGroupInfo groupInfo,
-                                     SinkResponse hiveResponse, List<StreamSinkFieldEntity> fieldList) {
+            SinkResponse hiveResponse, List<StreamSinkFieldEntity> fieldList) {
         DeserializationInfo deserializationInfo = null;
         String groupId = groupInfo.getInlongGroupId();
         String streamId = hiveResponse.getInlongStreamId();
@@ -187,11 +189,10 @@ public class PushSortConfigListener implements SortOperateListener {
             sourceInfo = new TubeSourceInfo(topic, masterAddress, consumerGroup,
                     deserializationInfo, sourceFields.toArray(new FieldInfo[0]));
         } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middleWare)) {
-            String pulsarAdminUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL);
-            String pulsarServiceUrl = commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL);
+            PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
             sourceInfo = SourceInfoUtils.createPulsarSourceInfo(groupInfo, streamInfo.getMqResourceObj(),
                     deserializationInfo, sourceFields, clusterBean.getAppName(), clusterBean.getDefaultTenant(),
-                    pulsarAdminUrl, pulsarServiceUrl);
+                    pulsarClusterInfo);
         }
         return sourceInfo;
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 2027f20..9077073 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -17,19 +17,17 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort.util;
 
-import java.util.List;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.enums.SourceType;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.source.SourceResponse;
 import org.apache.inlong.manager.common.pojo.source.binlog.BinlogSourceResponse;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.deserialization.DeserializationInfo;
 import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 
+import java.util.List;
+
 public class SourceInfoUtils {
 
     public static boolean isBinlogMigrationSource(SourceResponse sourceResponse) {
@@ -41,39 +39,15 @@ public class SourceInfoUtils {
     }
 
     public static PulsarSourceInfo createPulsarSourceInfo(InlongGroupInfo groupInfo, String pulsarTopic,
-                                                          DeserializationInfo deserializationInfo,
-                                                          List<FieldInfo> fieldInfos, String appName,String tenant,
-                                                          String pulsarAdminUrl, String pulsarServiceUrl) {
+            DeserializationInfo deserializationInfo,
+            List<FieldInfo> fieldInfos, String appName, String tenant,
+            PulsarClusterInfo pulsarClusterInfo) {
         final String namespace = groupInfo.getMqResourceObj();
         // Full name of Topic in Pulsar
         final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + pulsarTopic;
         final String consumerGroup = appName + "_" + pulsarTopic + "_consumer_group";
-        String adminUrl = null;
-        String serviceUrl = null;
-        String authentication = null;
-        if (CollectionUtils.isNotEmpty(groupInfo.getExtList())) {
-            for (InlongGroupExtInfo extInfo : groupInfo.getExtList()) {
-                if (InlongGroupSettings.PULSAR_SERVICE_URL.equals(extInfo.getKeyName())
-                        && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
-                    serviceUrl = extInfo.getKeyValue();
-                }
-                if (InlongGroupSettings.PULSAR_AUTHENTICATION.equals(extInfo.getKeyName())
-                        && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
-                    authentication = extInfo.getKeyValue();
-                }
-                if (InlongGroupSettings.PULSAR_ADMIN_URL.equals(extInfo.getKeyName())
-                        && StringUtils.isNotEmpty(extInfo.getKeyValue())) {
-                    adminUrl = extInfo.getKeyValue();
-                }
-            }
-        }
-        if (StringUtils.isEmpty(adminUrl)) {
-            adminUrl = pulsarAdminUrl;
-        }
-        if (StringUtils.isEmpty(serviceUrl)) {
-            serviceUrl = pulsarServiceUrl;
-        }
-        return new PulsarSourceInfo(adminUrl, serviceUrl, fullTopicName, consumerGroup,
-                deserializationInfo, fieldInfos.toArray(new FieldInfo[0]), authentication);
+        return new PulsarSourceInfo(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getBrokerServiceUrl(),
+                fullTopicName, consumerGroup, deserializationInfo, fieldInfos.toArray(new FieldInfo[0]),
+                pulsarClusterInfo.getToken());
     }
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 19c6f26..1e0d757 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -17,11 +17,8 @@
 
 package org.apache.inlong.manager.service.workflow.consumption.listener;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.List;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
@@ -46,6 +43,11 @@ import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+
 /**
  * Added data consumption process complete archive event listener
  */
@@ -117,9 +119,8 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
         Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
         String mqResourceObj = groupInfo.getMqResourceObj();
         Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + groupId);
-
-        try (PulsarAdmin pulsarAdmin = PulsarUtils
-                .getPulsarAdmin(groupInfo, commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL))) {
+        PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
             PulsarTopicBean topicMessage = new PulsarTopicBean();
             String tenant = clusterBean.getDefaultTenant();
             topicMessage.setTenant(tenant);
@@ -129,7 +130,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
             String consumerGroup = entity.getConsumerGroupId();
             List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
             List<String> topics = Arrays.asList(entity.getTopic().split(","));
-            this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, groupInfo);
+            this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, clusters, topics, globalCluster);
         } catch (Exception e) {
             log.error("create pulsar topic failed", e);
             throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
@@ -138,11 +139,13 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
     }
 
     private void createPulsarSubscription(PulsarAdmin globalPulsarAdmin, String subscription, PulsarTopicBean topicBean,
-            List<String> clusters, List<String> topics, InlongGroupInfo groupInfo) {
+            List<String> clusters, List<String> topics, PulsarClusterInfo globalCluster) {
         try {
             for (String cluster : clusters) {
                 String serviceUrl = PulsarUtils.getServiceUrl(globalPulsarAdmin, cluster);
-                try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(groupInfo, serviceUrl)) {
+                PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder()
+                        .token(globalCluster.getToken()).adminUrl(serviceUrl).build();
+                try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
                     pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
                 }
             }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java
index a9c6ae0..e516e4c 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/thirdparty/mq/util/PulsarUtilsTest.java
@@ -18,8 +18,6 @@
 package org.apache.inlong.manager.service.thirdparty.mq.util;
 
 import com.google.common.collect.Lists;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
@@ -32,6 +30,9 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.util.ReflectionUtils;
 
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+
 public class PulsarUtilsTest {
 
     @Test
@@ -51,7 +52,7 @@ public class PulsarUtilsTest {
         groupInfo.setExtList(groupExtInfoList);
         final String defaultServiceUrl = "http://127.0.0.1:10080";
         try {
-            PulsarAdmin admin = PulsarUtils.getPulsarAdmin(groupInfo, defaultServiceUrl);
+            PulsarAdmin admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
             Assert.assertEquals("http://127.0.0.1:8080", admin.getServiceUrl());
             Field auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
             assert auth != null;
@@ -66,7 +67,7 @@ public class PulsarUtilsTest {
             groupExtInfo3.setKeyValue("token1");
             groupExtInfoList.add(groupExtInfo3);
             try {
-                admin = PulsarUtils.getPulsarAdmin(groupInfo, defaultServiceUrl);
+                admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
             } catch (Exception e) {
                 if (e instanceof IllegalArgumentException) {
                     Assert.assertTrue(e.getMessage().contains("illegal authentication type"));
@@ -75,7 +76,7 @@ public class PulsarUtilsTest {
 
             groupExtInfoList = new ArrayList<>();
             groupInfo.setExtList(groupExtInfoList);
-            admin = PulsarUtils.getPulsarAdmin(groupInfo, defaultServiceUrl);
+            admin = PulsarUtils.getPulsarAdmin(defaultServiceUrl);
             Assert.assertEquals("http://127.0.0.1:10080", admin.getServiceUrl());
             auth = ReflectionUtils.findField(PulsarAdminImpl.class, "auth");
             assert auth != null;