You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/08 14:29:03 UTC

[inlong] branch master updated: [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar (#7171)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f367031d [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar (#7171)
3f367031d is described below

commit 3f367031d64a32924648d931de4ddea8b47bde1d
Author: healchow <he...@gmail.com>
AuthorDate: Sun Jan 8 22:28:56 2023 +0800

    [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar (#7171)
---
 .../apache/inlong/manager/client/BaseExample.java  |   2 -
 .../apache/inlong/manager/client/ut/BaseTest.java  |   2 -
 .../client/api/inner/ClientFactoryTest.java        |   1 -
 .../resources/mappers/InlongGroupEntityMapper.xml  |   7 +-
 .../pojo/cluster/kafka/KafkaClusterDTO.java        |  15 ++-
 .../pojo/cluster/pulsar/PulsarClusterDTO.java      |  17 +++-
 .../pojo/cluster/pulsar/PulsarClusterInfo.java     |   4 +-
 .../pojo/cluster/pulsar/PulsarClusterRequest.java  |   3 +-
 .../manager/pojo/group/InlongGroupBriefInfo.java   |   5 +-
 .../manager/pojo/group/pulsar/InlongPulsarDTO.java |   5 +
 .../pojo/group/pulsar/InlongPulsarInfo.java        |   6 --
 .../pojo/group/pulsar/InlongPulsarRequest.java     |   7 ++
 .../service/cluster/InlongClusterServiceImpl.java  |  25 ++---
 .../service/cluster/KafkaClusterOperator.java      |   2 -
 .../service/cluster/PulsarClusterOperator.java     |   2 -
 .../service/consume/ConsumePulsarOperator.java     |  14 ++-
 .../service/core/impl/AgentServiceImpl.java        | 105 ++++++++++-----------
 .../service/group/InlongGroupOperator4Pulsar.java  |   9 +-
 .../apply/ApproveConsumeProcessListener.java       |  18 ++--
 .../queue/pulsar/PulsarResourceOperator.java       |  42 +++++----
 .../source/pulsar/PulsarSourceOperator.java        |  15 +--
 .../service/cluster/InlongClusterServiceTest.java  |   1 -
 .../service/consume/InlongConsumeServiceTest.java  |   2 -
 23 files changed, 170 insertions(+), 139 deletions(-)

diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
index 9e1ddfaf2..dfa4cc6ee 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
@@ -71,8 +71,6 @@ public class BaseExample {
         pulsarInfo.setInCharges("admin");
 
         // pulsar conf
-        pulsarInfo.setServiceUrl(pulsarServiceUrl);
-        pulsarInfo.setAdminUrl(pulsarAdminUrl);
         pulsarInfo.setTenant(tenant);
         pulsarInfo.setMqResource(namespace);
 
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 1818f657d..0a46f939c 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -103,8 +103,6 @@ public class BaseTest {
         pulsarInfo.setInCharges(IN_CHARGES);
 
         // pulsar conf
-        pulsarInfo.setServiceUrl(PULSAR_SERVICE_URL);
-        pulsarInfo.setAdminUrl(PULSAR_ADMIN_URL);
         pulsarInfo.setTenant(TENANT);
         pulsarInfo.setMqResource(NAMESPACE);
 
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 03eb5cfee..bb1984936 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -717,7 +717,6 @@ class ClientFactoryTest {
                 .clusterTags("test_cluster_tag")
                 .type(ClusterType.PULSAR)
                 .adminUrl("http://127.0.0.1:8080")
-                .tenant("public")
                 .build();
 
         stubFor(
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index c99ed7595..77cbd6246 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -166,7 +166,8 @@
     </select>
     <select id="selectBriefList" parameterType="org.apache.inlong.manager.pojo.group.InlongGroupPageRequest"
             resultType="org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo">
-        select id, inlong_group_id, name, mq_type, mq_resource, inlong_cluster_tag, modify_time
+        select id, inlong_group_id, name, mq_type, mq_resource, dataReportType, inlong_cluster_tag, ext_params,
+               in_charges, status, creator, modifier, create_time, modify_time
         from inlong_group
         <where>
             is_deleted = 0
@@ -189,10 +190,8 @@
                 </foreach>
             </if>
         </where>
-        order by modify_time desc
-        limit 100
     </select>
-    <select id="selectByTopicRequest" resultType="org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest">
+    <select id="selectByTopicRequest" resultType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
         select
         <include refid="Base_Column_List"/>
         from inlong_group
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
index 9f5e25ca4..d6110671d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.cluster.kafka;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -39,17 +40,25 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Kafka cluster info")
 public class KafkaClusterDTO {
 
-    @Builder.Default
-    private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler";
-
+    /**
+     * Repeated save to ext_params field, it is convenient for DataProxy to obtain.
+     */
     @JsonProperty("bootstrap.servers")
+    @ApiModelProperty(value = "Kafka bootstrap servers' URL, is the 'url' field of the cluster")
     private String bootstrapServers;
 
+    /**
+     * Saved to ext_params field, it is convenient for DataProxy to obtain.
+     */
+    @Builder.Default
+    private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler";
+
     /**
      * Get the dto instance from the request
      */
     public static KafkaClusterDTO getFromRequest(KafkaClusterRequest request) {
         return KafkaClusterDTO.builder()
+                .bootstrapServers(request.getUrl())
                 .build();
     }
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
index d80d3da2b..3fa402d15 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -39,24 +39,31 @@ import javax.validation.constraints.NotNull;
 @ApiModel("Pulsar cluster info")
 public class PulsarClusterDTO {
 
-    @ApiModelProperty(value = "Pulsar admin URL, such as: http://127.0.0.1:8080", notes = "Pulsar service URL is the 'url' field of the cluster")
+    @ApiModelProperty(value = "Pulsar admin URL, such as: http://127.0.0.1:8080")
     private String adminUrl;
 
+    /**
+     * Repeated save to ext_params field, it is convenient for DataProxy to obtain.
+     */
+    @ApiModelProperty(value = "Pulsar service URL, is the 'url' field of the cluster")
+    private String serviceUrl;
+
     @ApiModelProperty(value = "Pulsar tenant, default is 'public'")
-    @Builder.Default
-    private String tenant = "public";
+    private String tenant;
 
+    /**
+     * Saved to ext_params field, it is convenient for DataProxy to obtain.
+     */
     @Builder.Default
     private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler";
 
-    private String serviceUrl;
-
     /**
      * Get the dto instance from the request
      */
     public static PulsarClusterDTO getFromRequest(PulsarClusterRequest request) {
         return PulsarClusterDTO.builder()
                 .adminUrl(request.getAdminUrl())
+                .serviceUrl(request.getUrl())
                 .tenant(request.getTenant())
                 .build();
     }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
index 58049ab66..e6b4a3d7f 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.pojo.cluster.pulsar;
 
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
-import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
@@ -44,8 +43,7 @@ public class PulsarClusterInfo extends ClusterInfo {
     private String adminUrl;
 
     @ApiModelProperty(value = "Pulsar tenant, default is 'public'")
-    @Builder.Default
-    private String tenant = "public";
+    private String tenant;
 
     public PulsarClusterInfo() {
         this.setType(ClusterType.PULSAR);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
index c3d913620..170fe2036 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.util.JsonTypeDefine;
 import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
@@ -40,7 +41,7 @@ public class PulsarClusterRequest extends ClusterRequest {
     private String adminUrl;
 
     @ApiModelProperty(value = "Pulsar tenant, default is 'public'")
-    private String tenant = "public";
+    private String tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
 
     public PulsarClusterRequest() {
         this.setType(ClusterType.PULSAR);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java
index c7d196b25..29c7418f2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java
@@ -54,9 +54,12 @@ public class InlongGroupBriefInfo {
     @ApiModelProperty(value = "MQ resource")
     private String mqResource;
 
-    @ApiModelProperty(value = "Inlong cluster tag")
+    @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
     private String inlongClusterTag;
 
+    @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
+    private String extParams;
+
     @ApiModelProperty(value = "Name of responsible person, separated by commas")
     private String inCharges;
 
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
index e42ae8837..7f0deddd1 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
+import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -38,9 +39,13 @@ import javax.validation.constraints.NotNull;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
 @ApiModel("Inlong group info for Pulsar")
 public class InlongPulsarDTO extends BaseInlongGroup {
 
+    @ApiModelProperty(value = "Pulsar tenant")
+    private String tenant;
+
     @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;"
             + "serial: single partition, low throughput, and orderly messages")
     private String queueModule;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
index 36e07e5a6..451cccbfc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
@@ -42,12 +42,6 @@ public class InlongPulsarInfo extends InlongGroupInfo {
     @ApiModelProperty(value = "Pulsar tenant")
     private String tenant;
 
-    @ApiModelProperty(value = "Pulsar admin URL")
-    private String adminUrl;
-
-    @ApiModelProperty(value = "Pulsar service URL")
-    private String serviceUrl;
-
     @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;"
             + "serial: single partition, low throughput, and orderly messages")
     private String queueModule = "PARALLEL";
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
index 986306665..7da1b625e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
@@ -36,6 +36,13 @@ import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
 @JsonTypeDefine(value = MQType.PULSAR)
 public class InlongPulsarRequest extends InlongGroupRequest {
 
+    /**
+     * TODO Add default value InlongConstants.DEFAULT_PULSAR_TENANT when you remove the 'tenant'
+     *  from {@link org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest}
+     */
+    @ApiModelProperty(value = "Pulsar tenant")
+    private String tenant;
+
     @ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;"
             + "serial: single partition, low throughput, and orderly messages")
     private String queueModule = "PARALLEL";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 598dd10a0..6ddd96575 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -66,6 +66,7 @@ import org.apache.inlong.manager.pojo.common.PageResult;
 import org.apache.inlong.manager.pojo.common.UpdateResult;
 import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
 import org.apache.inlong.manager.pojo.user.UserInfo;
 import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
@@ -1482,7 +1483,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
             return result;
         }
 
-        LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated group num={}",
+        LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated InlongGroup num={}",
                 clusterTagList, groupList.size());
         List<DataProxyTopicInfo> topicList = new ArrayList<>();
         for (InlongGroupBriefInfo groupInfo : groupList) {
@@ -1492,27 +1493,29 @@ public class InlongClusterServiceImpl implements InlongClusterService {
 
             String mqType = groupInfo.getMqType();
             if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-                List<InlongStreamBriefInfo> streamList = streamMapper.selectBriefList(groupId);
-                for (InlongStreamBriefInfo streamInfo : streamList) {
+                InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupInfo.getExtParams());
+                // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+                String tenant = pulsarDTO.getTenant();
+                if (StringUtils.isBlank(tenant)) {
+                    // If there are multiple Pulsar clusters, take the first one.
+                    // Note that the tenants in multiple Pulsar clusters must be identical.
                     List<InlongClusterEntity> pulsarClusters = clusterMapper.selectByKey(realClusterTag, null,
                             ClusterType.PULSAR);
                     if (CollectionUtils.isEmpty(pulsarClusters)) {
                         LOGGER.error("GetDPConfig: not found pulsar cluster by cluster tag={}", realClusterTag);
                         continue;
                     }
+                    PulsarClusterDTO cluster = PulsarClusterDTO.getFromJson(pulsarClusters.get(0).getExtParams());
+                    tenant = cluster.getTenant();
+                }
 
-                    // if there are multiple Pulsar clusters, take the first one
-                    InlongClusterEntity cluster = pulsarClusters.get(0);
-                    PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
-                    String tenant = pulsarCluster.getTenant();
-                    if (StringUtils.isBlank(tenant)) {
-                        tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
-                    }
-
+                List<InlongStreamBriefInfo> streamList = streamMapper.selectBriefList(groupId);
+                for (InlongStreamBriefInfo streamInfo : streamList) {
                     String streamId = streamInfo.getInlongStreamId();
                     String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
                             tenant, mqResource, streamInfo.getMqResource());
                     DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    // must format to groupId/streamId, needed by DataProxy
                     topicConfig.setInlongGroupId(groupId + "/" + streamId);
                     topicConfig.setTopic(topic);
                     topicList.add(topicConfig);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index dceac1864..b3eb9346c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -75,7 +75,6 @@ public class KafkaClusterOperator extends AbstractClusterOperator {
             CommonBeanUtils.copyProperties(dto, kafkaClusterInfo);
         }
 
-        LOGGER.info("success to get kafka cluster info from entity");
         return kafkaClusterInfo;
     }
 
@@ -85,7 +84,6 @@ public class KafkaClusterOperator extends AbstractClusterOperator {
         CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
         try {
             KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest);
-            dto.setBootstrapServers(kafkaRequest.getUrl());
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
             LOGGER.info("success to set entity for kafka cluster");
         } catch (Exception e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index a4c98bb11..2a724dc2b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -70,7 +70,6 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
             CommonBeanUtils.copyProperties(dto, pulsarInfo);
         }
 
-        LOGGER.info("success to get pulsar cluster info from entity");
         return pulsarInfo;
     }
 
@@ -80,7 +79,6 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
         CommonBeanUtils.copyProperties(pulsarRequest, targetEntity, true);
         try {
             PulsarClusterDTO dto = PulsarClusterDTO.getFromRequest(pulsarRequest);
-            dto.setServiceUrl(request.getUrl());
             targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
             LOGGER.info("success to set entity for pulsar cluster");
         } catch (Exception e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
index 6edc3a243..cf1f20d61 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
 import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -115,8 +116,17 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
         List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
         Preconditions.checkNotEmpty(clusterInfos, "pulsar cluster not exist for groupId=" + groupId);
         consumeInfo.setClusterInfos(clusterInfos);
-        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfos.get(0);
-        consumeInfo.setTopic(getFullPulsarTopic(groupInfo, pulsarCluster.getTenant(), entity.getTopic()));
+
+        // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+        String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+        if (StringUtils.isBlank(tenant)) {
+            // If there are multiple Pulsar clusters, take the first one.
+            // Note that the tenants in multiple Pulsar clusters must be identical.
+            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfos.get(0);
+            tenant = pulsarCluster.getTenant();
+        }
+
+        consumeInfo.setTopic(getFullPulsarTopic(groupInfo, tenant, entity.getTopic()));
         return consumeInfo;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0efd42c32..13918600c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.common.db.CommandEntity;
 import org.apache.inlong.common.enums.PullJobTypeEnum;
 import org.apache.inlong.common.enums.TaskTypeEnum;
@@ -36,7 +37,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
 import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
 import org.apache.inlong.manager.common.consts.AgentConstants;
 import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.SourceStatus;
@@ -59,6 +59,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
 import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
 import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
 import org.apache.inlong.manager.service.core.AgentService;
 import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
@@ -259,9 +260,6 @@ public class AgentServiceImpl implements AgentService {
 
     /**
      * Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated)
-     *
-     * @param request
-     * @return
      */
     private List<DataConfig> processQueuedTasks(TaskRequest request) {
         HashSet<SourceStatus> needAddStatusSet = Sets.newHashSet(SourceStatus.TOBE_ISSUED_SET);
@@ -307,7 +305,7 @@ public class AgentServiceImpl implements AgentService {
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes,
                 TASK_FETCH_SIZE);
         for (StreamSourceEntity sourceEntity : sourceEntities) {
-            // refresh agentip and uuid to make it can be processed in queued task
+            // refresh agent ip and uuid to make it can be processed in queued task
             sourceEntity.setAgentIp(taskRequest.getAgentIp());
             sourceEntity.setUuid(taskRequest.getUuid());
             sourceMapper.updateByPrimaryKeySelective(sourceEntity);
@@ -322,8 +320,6 @@ public class AgentServiceImpl implements AgentService {
     /**
      * Add subtasks to template tasks.
      * (Template task are agent_ip is null and template_id is null)
-     *
-     * @param taskRequest
      */
     private void preProcessTemplateFileTask(TaskRequest taskRequest) {
         List<Integer> needCopiedStatusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(),
@@ -368,8 +364,6 @@ public class AgentServiceImpl implements AgentService {
      *  1.agent ip match
      *  2.cluster name match
      *  Send the corresponding task action request according to the matching state of the tag and the current state
-     *
-     * @param taskRequest
      */
     private void preProcessLabelFileTasks(TaskRequest taskRequest) {
         List<Integer> needProcessedStatusList = Arrays.asList(
@@ -388,42 +382,41 @@ public class AgentServiceImpl implements AgentService {
         List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpAndCluster(needProcessedStatusList,
                 Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName);
 
-        sourceEntities.stream()
-                .forEach(sourceEntity -> {
-                    // case: agent tag unbind and mismatch source task
-                    Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
-                            SourceStatus.SOURCE_FROZEN,
-                            SourceStatus.TO_BE_ISSUED_FROZEN);
-                    if (!matchLabel(sourceEntity, clusterNodeEntity)
-                            && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) {
-                        LOGGER.info("Transform task({}) from {} to {} because tag mismatch "
-                                + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
-                                sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(),
-                                agentIp, agentClusterName);
-                        sourceMapper.updateStatus(
-                                sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), false);
-                    }
-
-                    // case: agent tag rebind and match source task again and stream is not in 'SUSPENDED' status
-                    InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
-                            sourceEntity.getInlongGroupId(), sourceEntity.getInlongStreamId());
-                    Set<SourceStatus> exceptedMatchedSourceStatus = Sets.newHashSet(
-                            SourceStatus.SOURCE_NORMAL,
-                            SourceStatus.TO_BE_ISSUED_ADD,
-                            SourceStatus.TO_BE_ISSUED_ACTIVE);
-                    Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
-                            StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
-                    if (matchLabel(sourceEntity, clusterNodeEntity)
-                            && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
-                            && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) {
-                        LOGGER.info("Transform task({}) from {} to {} because tag rematch "
-                                + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
-                                sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(),
-                                agentIp, agentClusterName);
-                        sourceMapper.updateStatus(
-                                sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), false);
-                    }
-                });
+        sourceEntities.forEach(sourceEntity -> {
+            // case: agent tag unbind and mismatch source task
+            Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
+                    SourceStatus.SOURCE_FROZEN,
+                    SourceStatus.TO_BE_ISSUED_FROZEN);
+            if (!matchLabel(sourceEntity, clusterNodeEntity)
+                    && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) {
+                LOGGER.info("Transform task({}) from {} to {} because tag mismatch "
+                        + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
+                        sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(),
+                        agentIp, agentClusterName);
+                sourceMapper.updateStatus(
+                        sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), false);
+            }
+
+            // case: agent tag rebind and match source task again and stream is not in 'SUSPENDED' status
+            InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+                    sourceEntity.getInlongGroupId(), sourceEntity.getInlongStreamId());
+            Set<SourceStatus> exceptedMatchedSourceStatus = Sets.newHashSet(
+                    SourceStatus.SOURCE_NORMAL,
+                    SourceStatus.TO_BE_ISSUED_ADD,
+                    SourceStatus.TO_BE_ISSUED_ACTIVE);
+            Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
+                    StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
+            if (matchLabel(sourceEntity, clusterNodeEntity)
+                    && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
+                    && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) {
+                LOGGER.info("Transform task({}) from {} to {} because tag rematch "
+                        + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
+                        sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(),
+                        agentIp, agentClusterName);
+                sourceMapper.updateStatus(
+                        sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), false);
+            }
+        });
     }
 
     private InlongClusterNodeEntity selectByIpAndCluster(String clusterName, String ip) {
@@ -436,9 +429,7 @@ public class AgentServiceImpl implements AgentService {
         nodeRequest.setKeyword(ip);
         nodeRequest.setParentId(clusterEntity.getId());
         nodeRequest.setType(ClusterType.AGENT);
-        InlongClusterNodeEntity clusterNodeEntity =
-                clusterNodeMapper.selectByCondition(nodeRequest).stream().findFirst().orElse(null);
-        return clusterNodeEntity;
+        return clusterNodeMapper.selectByCondition(nodeRequest).stream().findFirst().orElse(null);
     }
 
     private int getOp(int status) {
@@ -496,7 +487,7 @@ public class AgentServiceImpl implements AgentService {
         if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
             // add mq cluster setting
             List<MQClusterInfo> mqSet = new ArrayList<>();
-            List<String> clusterTagList = Arrays.asList(groupEntity.getInlongClusterTag());
+            List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag());
             List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR);
             ClusterPageRequest pageRequest = ClusterPageRequest.builder()
                     .typeList(typeList)
@@ -512,16 +503,21 @@ public class AgentServiceImpl implements AgentService {
                 mqSet.add(clusterInfo);
             }
             dataConfig.setMqClusters(mqSet);
+
             // add topic setting
-            InlongClusterEntity cluster = mqClusterList.get(0);
             String mqResource = groupEntity.getMqResource();
             String mqType = groupEntity.getMqType();
             if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
-                PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
-                String tenant = pulsarCluster.getTenant();
+                // first get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+                InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+                String tenant = pulsarDTO.getTenant();
                 if (StringUtils.isBlank(tenant)) {
-                    tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+                    // If there are multiple Pulsar clusters, take the first one.
+                    // Note that the tenants in multiple Pulsar clusters must be identical.
+                    PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(mqClusterList.get(0).getExtParams());
+                    tenant = pulsarCluster.getTenant();
                 }
+
                 String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
                         tenant, mqResource, streamEntity.getMqResource());
                 DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
@@ -594,6 +590,7 @@ public class AgentServiceImpl implements AgentService {
                 : Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
         Set<String> sourceLabels = Stream.of(
                 sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
-        return sourceLabels.stream().anyMatch(sourceLabel -> clusterNodeLabels.contains(sourceLabel));
+        return sourceLabels.stream().anyMatch(clusterNodeLabels::contains);
     }
+
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index 810241f1a..f1cc29013 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -18,7 +18,6 @@
 package org.apache.inlong.manager.service.group;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.common.constant.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -122,10 +121,12 @@ public class InlongGroupOperator4Pulsar extends AbstractGroupOperator {
     public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
                 groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
-        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
-                ? InlongConstants.DEFAULT_PULSAR_TENANT
-                : pulsarCluster.getTenant();
 
+        // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+        String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+        if (StringUtils.isBlank(tenant)) {
+            tenant = pulsarCluster.getTenant();
+        }
         InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
         topicInfo.setTenant(tenant);
         topicInfo.setNamespace(groupInfo.getMqResource());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index ed702cff5..4652c39d6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.listener.consume.apply;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ConsumeStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
 import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
@@ -87,11 +88,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
         String mqType = entity.getMqType();
         if (MQType.TUBEMQ.equals(mqType)) {
             this.createTubeConsumerGroup(entity, context.getOperator());
-            return ListenerResult.success("Create TubeMQ consumer group successful");
         } else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
             this.createPulsarSubscription(entity);
         } else if (MQType.KAFKA.equals(mqType)) {
-            // TODO add Kafka
+            // Kafka consumers do not need to register in advance
         } else {
             throw new WorkflowListenerException("Unsupported MQ type " + mqType);
         }
@@ -130,17 +130,17 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
         ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            PulsarTopicInfo topicMessage = new PulsarTopicInfo();
-            String tenant = pulsarCluster.getTenant();
-            if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+            InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+            String tenant = pulsarDTO.getTenant();
+            if (StringUtils.isBlank(tenant)) {
+                tenant = pulsarCluster.getTenant();
             }
+            PulsarTopicInfo topicMessage = new PulsarTopicInfo();
             topicMessage.setTenant(tenant);
             topicMessage.setNamespace(mqResource);
 
-            String consumerGroup = entity.getConsumerGroup();
             List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
-            this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
+            this.createPulsarSubscription(pulsarAdmin, entity.getConsumerGroup(), topicMessage, topics);
         } catch (Exception e) {
             log.error("create pulsar topic failed", e);
             throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 7f3b69a8a..4b81123d4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -21,10 +21,11 @@ import com.google.common.base.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
@@ -83,22 +84,27 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         String clusterTag = groupInfo.getInlongClusterTag();
         log.info("begin to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
 
+        if (!(groupInfo instanceof InlongPulsarInfo)) {
+            throw new BusinessException("the mqType must be PULSAR for inlongGroupId=" + groupId);
+        }
+
+        InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+        String tenant = pulsarInfo.getTenant();
         // get pulsar cluster via the inlong cluster tag from the inlong group
         List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
         for (ClusterInfo clusterInfo : clusterInfos) {
             PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
                 // create pulsar tenant and namespace
-                String tenant = pulsarCluster.getTenant();
-                if (StringUtils.isEmpty(tenant)) {
-                    tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+                if (StringUtils.isBlank(tenant)) {
+                    tenant = pulsarCluster.getTenant();
                 }
 
                 // if the group was not successful, need create tenant and namespace
                 if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
                     pulsarOperator.createTenant(pulsarAdmin, tenant);
                     String namespace = groupInfo.getMqResource();
-                    pulsarOperator.createNamespace(pulsarAdmin, (InlongPulsarInfo) groupInfo, tenant, namespace);
+                    pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
 
                     log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}",
                             groupId, tenant, namespace, pulsarCluster);
@@ -131,7 +137,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
             PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
                 for (InlongStreamBriefInfo streamInfo : streamInfos) {
-                    this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+                    this.deletePulsarTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
                 }
             } catch (Exception e) {
                 String msg = "failed to delete pulsar resource for groupId=" + groupId;
@@ -192,7 +198,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         for (ClusterInfo clusterInfo : clusterInfos) {
             PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
             try {
-                this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+                this.deletePulsarTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
                 log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}",
                         groupId, streamId, streamInfo.getMqResource(), pulsarCluster);
             } catch (Exception e) {
@@ -211,9 +217,9 @@ public class PulsarResourceOperator implements QueueResourceOperator {
     private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
             throws Exception {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            String tenant = pulsarCluster.getTenant();
-            if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+            String tenant = pulsarInfo.getTenant();
+            if (StringUtils.isBlank(tenant)) {
+                tenant = pulsarCluster.getTenant();
             }
             String namespace = pulsarInfo.getMqResource();
             PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
@@ -233,9 +239,9 @@ public class PulsarResourceOperator implements QueueResourceOperator {
     private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName,
             String streamId) throws Exception {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            String tenant = pulsarCluster.getTenant();
-            if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+            String tenant = pulsarInfo.getTenant();
+            if (StringUtils.isBlank(tenant)) {
+                tenant = pulsarCluster.getTenant();
             }
             String namespace = pulsarInfo.getMqResource();
             String fullTopicName = tenant + "/" + namespace + "/" + topicName;
@@ -274,14 +280,14 @@ public class PulsarResourceOperator implements QueueResourceOperator {
      * Delete Pulsar Topic and Subscription, and delete the consumer group info.
      * TODO delete Subscription and InlongConsume info
      */
-    private void deletePulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName)
+    private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
             throws Exception {
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            String tenant = pulsarCluster.getTenant();
-            if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+            String tenant = pulsarInfo.getTenant();
+            if (StringUtils.isBlank(tenant)) {
+                tenant = pulsarCluster.getTenant();
             }
-            String namespace = groupInfo.getMqResource();
+            String namespace = pulsarInfo.getMqResource();
             PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
                     .tenant(tenant)
                     .namespace(namespace)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index a29020a0e..a20629e4e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.consts.SourceType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -33,6 +32,7 @@ import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.source.SourceRequest;
 import org.apache.inlong.manager.pojo.source.StreamSource;
 import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
@@ -112,12 +112,15 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
         String adminUrl = pulsarCluster.getAdminUrl();
         String serviceUrl = pulsarCluster.getUrl();
-        String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
-                ? InlongConstants.DEFAULT_PULSAR_TENANT
-                : pulsarCluster.getTenant();
+
+        // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+        String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+        if (StringUtils.isBlank(tenant)) {
+            tenant = pulsarCluster.getTenant();
+        }
 
         Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
-        streamInfos.forEach(streamInfo -> {
+        for (InlongStreamInfo streamInfo : streamInfos) {
             PulsarSource pulsarSource = new PulsarSource();
             String streamId = streamInfo.getInlongStreamId();
             pulsarSource.setSourceName(streamId);
@@ -166,7 +169,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
             pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
             pulsarSource.setFieldList(streamInfo.getFieldList());
             sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
-        });
+        }
 
         return sourceMap;
     }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 4e659a055..41afde323 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -76,7 +76,6 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
         request.setName(clusterName);
         request.setType(ClusterType.PULSAR);
         request.setAdminUrl(adminUrl);
-        request.setTenant("public");
         request.setInCharges(GLOBAL_OPERATOR);
         return clusterService.save(request, GLOBAL_OPERATOR);
     }
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
index 44bfe7258..3742aaaac 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
@@ -156,8 +156,6 @@ public class InlongConsumeServiceTest extends ServiceBaseTest {
         request.setType(ClusterType.PULSAR);
         String adminUrl = "http://127.0.0.1:8080";
         request.setAdminUrl(adminUrl);
-        String tenant = "public";
-        request.setTenant(tenant);
         request.setInCharges(GLOBAL_OPERATOR);
         clusterId = clusterService.save(request, GLOBAL_OPERATOR);
     }