You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/01/08 14:29:03 UTC
[inlong] branch master updated: [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar (#7171)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3f367031d [INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar (#7171)
3f367031d is described below
commit 3f367031d64a32924648d931de4ddea8b47bde1d
Author: healchow <he...@gmail.com>
AuthorDate: Sun Jan 8 22:28:56 2023 +0800
[INLONG-5776][Manager] Add tenant param to InlongGroup that uses Pulsar (#7171)
---
.../apache/inlong/manager/client/BaseExample.java | 2 -
.../apache/inlong/manager/client/ut/BaseTest.java | 2 -
.../client/api/inner/ClientFactoryTest.java | 1 -
.../resources/mappers/InlongGroupEntityMapper.xml | 7 +-
.../pojo/cluster/kafka/KafkaClusterDTO.java | 15 ++-
.../pojo/cluster/pulsar/PulsarClusterDTO.java | 17 +++-
.../pojo/cluster/pulsar/PulsarClusterInfo.java | 4 +-
.../pojo/cluster/pulsar/PulsarClusterRequest.java | 3 +-
.../manager/pojo/group/InlongGroupBriefInfo.java | 5 +-
.../manager/pojo/group/pulsar/InlongPulsarDTO.java | 5 +
.../pojo/group/pulsar/InlongPulsarInfo.java | 6 --
.../pojo/group/pulsar/InlongPulsarRequest.java | 7 ++
.../service/cluster/InlongClusterServiceImpl.java | 25 ++---
.../service/cluster/KafkaClusterOperator.java | 2 -
.../service/cluster/PulsarClusterOperator.java | 2 -
.../service/consume/ConsumePulsarOperator.java | 14 ++-
.../service/core/impl/AgentServiceImpl.java | 105 ++++++++++-----------
.../service/group/InlongGroupOperator4Pulsar.java | 9 +-
.../apply/ApproveConsumeProcessListener.java | 18 ++--
.../queue/pulsar/PulsarResourceOperator.java | 42 +++++----
.../source/pulsar/PulsarSourceOperator.java | 15 +--
.../service/cluster/InlongClusterServiceTest.java | 1 -
.../service/consume/InlongConsumeServiceTest.java | 2 -
23 files changed, 170 insertions(+), 139 deletions(-)
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
index 9e1ddfaf2..dfa4cc6ee 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/BaseExample.java
@@ -71,8 +71,6 @@ public class BaseExample {
pulsarInfo.setInCharges("admin");
// pulsar conf
- pulsarInfo.setServiceUrl(pulsarServiceUrl);
- pulsarInfo.setAdminUrl(pulsarAdminUrl);
pulsarInfo.setTenant(tenant);
pulsarInfo.setMqResource(namespace);
diff --git a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
index 1818f657d..0a46f939c 100644
--- a/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
+++ b/inlong-manager/manager-client-examples/src/test/java/org/apache/inlong/manager/client/ut/BaseTest.java
@@ -103,8 +103,6 @@ public class BaseTest {
pulsarInfo.setInCharges(IN_CHARGES);
// pulsar conf
- pulsarInfo.setServiceUrl(PULSAR_SERVICE_URL);
- pulsarInfo.setAdminUrl(PULSAR_ADMIN_URL);
pulsarInfo.setTenant(TENANT);
pulsarInfo.setMqResource(NAMESPACE);
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 03eb5cfee..bb1984936 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -717,7 +717,6 @@ class ClientFactoryTest {
.clusterTags("test_cluster_tag")
.type(ClusterType.PULSAR)
.adminUrl("http://127.0.0.1:8080")
- .tenant("public")
.build();
stubFor(
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
index c99ed7595..77cbd6246 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongGroupEntityMapper.xml
@@ -166,7 +166,8 @@
</select>
<select id="selectBriefList" parameterType="org.apache.inlong.manager.pojo.group.InlongGroupPageRequest"
resultType="org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo">
- select id, inlong_group_id, name, mq_type, mq_resource, inlong_cluster_tag, modify_time
+ select id, inlong_group_id, name, mq_type, mq_resource, dataReportType, inlong_cluster_tag, ext_params,
+ in_charges, status, creator, modifier, create_time, modify_time
from inlong_group
<where>
is_deleted = 0
@@ -189,10 +190,8 @@
</foreach>
</if>
</where>
- order by modify_time desc
- limit 100
</select>
- <select id="selectByTopicRequest" resultType="org.apache.inlong.manager.pojo.group.InlongGroupTopicRequest">
+ <select id="selectByTopicRequest" resultType="org.apache.inlong.manager.dao.entity.InlongGroupEntity">
select
<include refid="Base_Column_List"/>
from inlong_group
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
index 9f5e25ca4..d6110671d 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.cluster.kafka;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -39,17 +40,25 @@ import javax.validation.constraints.NotNull;
@ApiModel("Kafka cluster info")
public class KafkaClusterDTO {
- @Builder.Default
- private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler";
-
+ /**
+ * Repeated save to ext_params field, it is convenient for DataProxy to obtain.
+ */
@JsonProperty("bootstrap.servers")
+ @ApiModelProperty(value = "Kafka bootstrap servers' URL, is the 'url' field of the cluster")
private String bootstrapServers;
+ /**
+ * Saved to ext_params field, it is convenient for DataProxy to obtain.
+ */
+ @Builder.Default
+ private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.kafka.KafkaHandler";
+
/**
* Get the dto instance from the request
*/
public static KafkaClusterDTO getFromRequest(KafkaClusterRequest request) {
return KafkaClusterDTO.builder()
+ .bootstrapServers(request.getUrl())
.build();
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
index d80d3da2b..3fa402d15 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterDTO.java
@@ -39,24 +39,31 @@ import javax.validation.constraints.NotNull;
@ApiModel("Pulsar cluster info")
public class PulsarClusterDTO {
- @ApiModelProperty(value = "Pulsar admin URL, such as: http://127.0.0.1:8080", notes = "Pulsar service URL is the 'url' field of the cluster")
+ @ApiModelProperty(value = "Pulsar admin URL, such as: http://127.0.0.1:8080")
private String adminUrl;
+ /**
+ * Repeated save to ext_params field, it is convenient for DataProxy to obtain.
+ */
+ @ApiModelProperty(value = "Pulsar service URL, is the 'url' field of the cluster")
+ private String serviceUrl;
+
@ApiModelProperty(value = "Pulsar tenant, default is 'public'")
- @Builder.Default
- private String tenant = "public";
+ private String tenant;
+ /**
+ * Saved to ext_params field, it is convenient for DataProxy to obtain.
+ */
@Builder.Default
private String messageQueueHandler = "org.apache.inlong.dataproxy.sink.mq.pulsar.PulsarHandler";
- private String serviceUrl;
-
/**
* Get the dto instance from the request
*/
public static PulsarClusterDTO getFromRequest(PulsarClusterRequest request) {
return PulsarClusterDTO.builder()
.adminUrl(request.getAdminUrl())
+ .serviceUrl(request.getUrl())
.tenant(request.getTenant())
.build();
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
index 58049ab66..e6b4a3d7f 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterInfo.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.pojo.cluster.pulsar;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
-import lombok.Builder;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -44,8 +43,7 @@ public class PulsarClusterInfo extends ClusterInfo {
private String adminUrl;
@ApiModelProperty(value = "Pulsar tenant, default is 'public'")
- @Builder.Default
- private String tenant = "public";
+ private String tenant;
public PulsarClusterInfo() {
this.setType(ClusterType.PULSAR);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
index c3d913620..170fe2036 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/pulsar/PulsarClusterRequest.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
@@ -40,7 +41,7 @@ public class PulsarClusterRequest extends ClusterRequest {
private String adminUrl;
@ApiModelProperty(value = "Pulsar tenant, default is 'public'")
- private String tenant = "public";
+ private String tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
public PulsarClusterRequest() {
this.setType(ClusterType.PULSAR);
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java
index c7d196b25..29c7418f2 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupBriefInfo.java
@@ -54,9 +54,12 @@ public class InlongGroupBriefInfo {
@ApiModelProperty(value = "MQ resource")
private String mqResource;
- @ApiModelProperty(value = "Inlong cluster tag")
+ @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
private String inlongClusterTag;
+ @ApiModelProperty(value = "Inlong cluster tag, which links to inlong_cluster table")
+ private String extParams;
+
@ApiModelProperty(value = "Name of responsible person, separated by commas")
private String inCharges;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
index e42ae8837..7f0deddd1 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarDTO.java
@@ -22,6 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
@@ -38,9 +39,13 @@ import javax.validation.constraints.NotNull;
@Builder
@NoArgsConstructor
@AllArgsConstructor
+@EqualsAndHashCode(callSuper = true)
@ApiModel("Inlong group info for Pulsar")
public class InlongPulsarDTO extends BaseInlongGroup {
+ @ApiModelProperty(value = "Pulsar tenant")
+ private String tenant;
+
@ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;"
+ "serial: single partition, low throughput, and orderly messages")
private String queueModule;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
index 36e07e5a6..451cccbfc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarInfo.java
@@ -42,12 +42,6 @@ public class InlongPulsarInfo extends InlongGroupInfo {
@ApiModelProperty(value = "Pulsar tenant")
private String tenant;
- @ApiModelProperty(value = "Pulsar admin URL")
- private String adminUrl;
-
- @ApiModelProperty(value = "Pulsar service URL")
- private String serviceUrl;
-
@ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;"
+ "serial: single partition, low throughput, and orderly messages")
private String queueModule = "PARALLEL";
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
index 986306665..7da1b625e 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/pulsar/InlongPulsarRequest.java
@@ -36,6 +36,13 @@ import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
@JsonTypeDefine(value = MQType.PULSAR)
public class InlongPulsarRequest extends InlongGroupRequest {
+ /**
+ * TODO Add default value InlongConstants.DEFAULT_PULSAR_TENANT when you remove the 'tenant'
+ * from {@link org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterRequest}
+ */
+ @ApiModelProperty(value = "Pulsar tenant")
+ private String tenant;
+
@ApiModelProperty(value = "Queue model, parallel: multiple partitions, high throughput, out-of-order messages;"
+ "serial: single partition, low throughput, and orderly messages")
private String queueModule = "PARALLEL";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 598dd10a0..6ddd96575 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -66,6 +66,7 @@ import org.apache.inlong.manager.pojo.common.PageResult;
import org.apache.inlong.manager.pojo.common.UpdateResult;
import org.apache.inlong.manager.pojo.group.InlongGroupBriefInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.user.UserInfo;
import org.apache.inlong.manager.service.repository.DataProxyConfigRepository;
@@ -1482,7 +1483,7 @@ public class InlongClusterServiceImpl implements InlongClusterService {
return result;
}
- LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated group num={}",
+ LOGGER.debug("GetDPConfig: begin to get config for cluster tags={}, associated InlongGroup num={}",
clusterTagList, groupList.size());
List<DataProxyTopicInfo> topicList = new ArrayList<>();
for (InlongGroupBriefInfo groupInfo : groupList) {
@@ -1492,27 +1493,29 @@ public class InlongClusterServiceImpl implements InlongClusterService {
String mqType = groupInfo.getMqType();
if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- List<InlongStreamBriefInfo> streamList = streamMapper.selectBriefList(groupId);
- for (InlongStreamBriefInfo streamInfo : streamList) {
+ InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupInfo.getExtParams());
+ // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+ String tenant = pulsarDTO.getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ // If there are multiple Pulsar clusters, take the first one.
+ // Note that the tenants in multiple Pulsar clusters must be identical.
List<InlongClusterEntity> pulsarClusters = clusterMapper.selectByKey(realClusterTag, null,
ClusterType.PULSAR);
if (CollectionUtils.isEmpty(pulsarClusters)) {
LOGGER.error("GetDPConfig: not found pulsar cluster by cluster tag={}", realClusterTag);
continue;
}
+ PulsarClusterDTO cluster = PulsarClusterDTO.getFromJson(pulsarClusters.get(0).getExtParams());
+ tenant = cluster.getTenant();
+ }
- // if there are multiple Pulsar clusters, take the first one
- InlongClusterEntity cluster = pulsarClusters.get(0);
- PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
- String tenant = pulsarCluster.getTenant();
- if (StringUtils.isBlank(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
- }
-
+ List<InlongStreamBriefInfo> streamList = streamMapper.selectBriefList(groupId);
+ for (InlongStreamBriefInfo streamInfo : streamList) {
String streamId = streamInfo.getInlongStreamId();
String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
tenant, mqResource, streamInfo.getMqResource());
DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ // must format to groupId/streamId, needed by DataProxy
topicConfig.setInlongGroupId(groupId + "/" + streamId);
topicConfig.setTopic(topic);
topicList.add(topicConfig);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
index dceac1864..b3eb9346c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -75,7 +75,6 @@ public class KafkaClusterOperator extends AbstractClusterOperator {
CommonBeanUtils.copyProperties(dto, kafkaClusterInfo);
}
- LOGGER.info("success to get kafka cluster info from entity");
return kafkaClusterInfo;
}
@@ -85,7 +84,6 @@ public class KafkaClusterOperator extends AbstractClusterOperator {
CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
try {
KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest);
- dto.setBootstrapServers(kafkaRequest.getUrl());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
LOGGER.info("success to set entity for kafka cluster");
} catch (Exception e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
index a4c98bb11..2a724dc2b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/PulsarClusterOperator.java
@@ -70,7 +70,6 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
CommonBeanUtils.copyProperties(dto, pulsarInfo);
}
- LOGGER.info("success to get pulsar cluster info from entity");
return pulsarInfo;
}
@@ -80,7 +79,6 @@ public class PulsarClusterOperator extends AbstractClusterOperator {
CommonBeanUtils.copyProperties(pulsarRequest, targetEntity, true);
try {
PulsarClusterDTO dto = PulsarClusterDTO.getFromRequest(pulsarRequest);
- dto.setServiceUrl(request.getUrl());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
LOGGER.info("success to set entity for pulsar cluster");
} catch (Exception e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
index 6edc3a243..cf1f20d61 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/ConsumePulsarOperator.java
@@ -37,6 +37,7 @@ import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarInfo;
import org.apache.inlong.manager.pojo.consume.pulsar.ConsumePulsarRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.group.InlongGroupService;
@@ -115,8 +116,17 @@ public class ConsumePulsarOperator extends AbstractConsumeOperator {
List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
Preconditions.checkNotEmpty(clusterInfos, "pulsar cluster not exist for groupId=" + groupId);
consumeInfo.setClusterInfos(clusterInfos);
- PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfos.get(0);
- consumeInfo.setTopic(getFullPulsarTopic(groupInfo, pulsarCluster.getTenant(), entity.getTopic()));
+
+ // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+ String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ // If there are multiple Pulsar clusters, take the first one.
+ // Note that the tenants in multiple Pulsar clusters must be identical.
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfos.get(0);
+ tenant = pulsarCluster.getTenant();
+ }
+
+ consumeInfo.setTopic(getFullPulsarTopic(groupInfo, tenant, entity.getTopic()));
return consumeInfo;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index 0efd42c32..13918600c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.common.constant.Constants;
+import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.db.CommandEntity;
import org.apache.inlong.common.enums.PullJobTypeEnum;
import org.apache.inlong.common.enums.TaskTypeEnum;
@@ -36,7 +37,6 @@ import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
import org.apache.inlong.manager.common.consts.AgentConstants;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.SourceStatus;
@@ -59,6 +59,7 @@ import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.pojo.cluster.agent.AgentClusterNodeBindGroupRequest;
import org.apache.inlong.manager.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterDTO;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.source.file.FileSourceDTO;
import org.apache.inlong.manager.service.core.AgentService;
import org.apache.inlong.manager.service.source.SourceSnapshotOperator;
@@ -259,9 +260,6 @@ public class AgentServiceImpl implements AgentService {
/**
* Query the tasks that source is waited to be operated.(only clusterName and ip matched it can be operated)
- *
- * @param request
- * @return
*/
private List<DataConfig> processQueuedTasks(TaskRequest request) {
HashSet<SourceStatus> needAddStatusSet = Sets.newHashSet(SourceStatus.TOBE_ISSUED_SET);
@@ -307,7 +305,7 @@ public class AgentServiceImpl implements AgentService {
List<StreamSourceEntity> sourceEntities = sourceMapper.selectByStatusAndType(needAddStatusList, sourceTypes,
TASK_FETCH_SIZE);
for (StreamSourceEntity sourceEntity : sourceEntities) {
- // refresh agentip and uuid to make it can be processed in queued task
+ // refresh agent ip and uuid to make it can be processed in queued task
sourceEntity.setAgentIp(taskRequest.getAgentIp());
sourceEntity.setUuid(taskRequest.getUuid());
sourceMapper.updateByPrimaryKeySelective(sourceEntity);
@@ -322,8 +320,6 @@ public class AgentServiceImpl implements AgentService {
/**
* Add subtasks to template tasks.
* (Template task are agent_ip is null and template_id is null)
- *
- * @param taskRequest
*/
private void preProcessTemplateFileTask(TaskRequest taskRequest) {
List<Integer> needCopiedStatusList = Arrays.asList(SourceStatus.TO_BE_ISSUED_ADD.getCode(),
@@ -368,8 +364,6 @@ public class AgentServiceImpl implements AgentService {
* 1.agent ip match
* 2.cluster name match
* Send the corresponding task action request according to the matching state of the tag and the current state
- *
- * @param taskRequest
*/
private void preProcessLabelFileTasks(TaskRequest taskRequest) {
List<Integer> needProcessedStatusList = Arrays.asList(
@@ -388,42 +382,41 @@ public class AgentServiceImpl implements AgentService {
List<StreamSourceEntity> sourceEntities = sourceMapper.selectByAgentIpAndCluster(needProcessedStatusList,
Lists.newArrayList(SourceType.FILE), agentIp, agentClusterName);
- sourceEntities.stream()
- .forEach(sourceEntity -> {
- // case: agent tag unbind and mismatch source task
- Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
- SourceStatus.SOURCE_FROZEN,
- SourceStatus.TO_BE_ISSUED_FROZEN);
- if (!matchLabel(sourceEntity, clusterNodeEntity)
- && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) {
- LOGGER.info("Transform task({}) from {} to {} because tag mismatch "
- + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
- sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(),
- agentIp, agentClusterName);
- sourceMapper.updateStatus(
- sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), false);
- }
-
- // case: agent tag rebind and match source task again and stream is not in 'SUSPENDED' status
- InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
- sourceEntity.getInlongGroupId(), sourceEntity.getInlongStreamId());
- Set<SourceStatus> exceptedMatchedSourceStatus = Sets.newHashSet(
- SourceStatus.SOURCE_NORMAL,
- SourceStatus.TO_BE_ISSUED_ADD,
- SourceStatus.TO_BE_ISSUED_ACTIVE);
- Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
- StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
- if (matchLabel(sourceEntity, clusterNodeEntity)
- && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
- && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) {
- LOGGER.info("Transform task({}) from {} to {} because tag rematch "
- + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
- sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(),
- agentIp, agentClusterName);
- sourceMapper.updateStatus(
- sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), false);
- }
- });
+ sourceEntities.forEach(sourceEntity -> {
+ // case: agent tag unbind and mismatch source task
+ Set<SourceStatus> exceptedUnmatchedStatus = Sets.newHashSet(
+ SourceStatus.SOURCE_FROZEN,
+ SourceStatus.TO_BE_ISSUED_FROZEN);
+ if (!matchLabel(sourceEntity, clusterNodeEntity)
+ && !exceptedUnmatchedStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))) {
+ LOGGER.info("Transform task({}) from {} to {} because tag mismatch "
+ + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
+ sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(),
+ agentIp, agentClusterName);
+ sourceMapper.updateStatus(
+ sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_FROZEN.getCode(), false);
+ }
+
+ // case: agent tag rebind and match source task again and stream is not in 'SUSPENDED' status
+ InlongStreamEntity streamEntity = streamMapper.selectByIdentifier(
+ sourceEntity.getInlongGroupId(), sourceEntity.getInlongStreamId());
+ Set<SourceStatus> exceptedMatchedSourceStatus = Sets.newHashSet(
+ SourceStatus.SOURCE_NORMAL,
+ SourceStatus.TO_BE_ISSUED_ADD,
+ SourceStatus.TO_BE_ISSUED_ACTIVE);
+ Set<StreamStatus> exceptedMatchedStreamStatus = Sets.newHashSet(
+ StreamStatus.SUSPENDED, StreamStatus.SUSPENDED);
+ if (matchLabel(sourceEntity, clusterNodeEntity)
+ && !exceptedMatchedSourceStatus.contains(SourceStatus.forCode(sourceEntity.getStatus()))
+ && !exceptedMatchedStreamStatus.contains(StreamStatus.forCode(streamEntity.getStatus()))) {
+ LOGGER.info("Transform task({}) from {} to {} because tag rematch "
+ + "for agent({}) in cluster({})", sourceEntity.getAgentIp(),
+ sourceEntity.getStatus(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(),
+ agentIp, agentClusterName);
+ sourceMapper.updateStatus(
+ sourceEntity.getId(), SourceStatus.TO_BE_ISSUED_ACTIVE.getCode(), false);
+ }
+ });
}
private InlongClusterNodeEntity selectByIpAndCluster(String clusterName, String ip) {
@@ -436,9 +429,7 @@ public class AgentServiceImpl implements AgentService {
nodeRequest.setKeyword(ip);
nodeRequest.setParentId(clusterEntity.getId());
nodeRequest.setType(ClusterType.AGENT);
- InlongClusterNodeEntity clusterNodeEntity =
- clusterNodeMapper.selectByCondition(nodeRequest).stream().findFirst().orElse(null);
- return clusterNodeEntity;
+ return clusterNodeMapper.selectByCondition(nodeRequest).stream().findFirst().orElse(null);
}
private int getOp(int status) {
@@ -496,7 +487,7 @@ public class AgentServiceImpl implements AgentService {
if (InlongConstants.REPORT_TO_MQ_RECEIVED == dataReportType) {
// add mq cluster setting
List<MQClusterInfo> mqSet = new ArrayList<>();
- List<String> clusterTagList = Arrays.asList(groupEntity.getInlongClusterTag());
+ List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag());
List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR);
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
.typeList(typeList)
@@ -512,16 +503,21 @@ public class AgentServiceImpl implements AgentService {
mqSet.add(clusterInfo);
}
dataConfig.setMqClusters(mqSet);
+
// add topic setting
- InlongClusterEntity cluster = mqClusterList.get(0);
String mqResource = groupEntity.getMqResource();
String mqType = groupEntity.getMqType();
if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(cluster.getExtParams());
- String tenant = pulsarCluster.getTenant();
+ // first get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+ InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+ String tenant = pulsarDTO.getTenant();
if (StringUtils.isBlank(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ // If there are multiple Pulsar clusters, take the first one.
+ // Note that the tenants in multiple Pulsar clusters must be identical.
+ PulsarClusterDTO pulsarCluster = PulsarClusterDTO.getFromJson(mqClusterList.get(0).getExtParams());
+ tenant = pulsarCluster.getTenant();
}
+
String topic = String.format(InlongConstants.PULSAR_TOPIC_FORMAT,
tenant, mqResource, streamEntity.getMqResource());
DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
@@ -594,6 +590,7 @@ public class AgentServiceImpl implements AgentService {
: Sets.newHashSet(extParams.get(AgentConstants.AGENT_GROUP_KEY).split(InlongConstants.COMMA));
Set<String> sourceLabels = Stream.of(
sourceEntity.getInlongClusterNodeGroup().split(InlongConstants.COMMA)).collect(Collectors.toSet());
- return sourceLabels.stream().anyMatch(sourceLabel -> clusterNodeLabels.contains(sourceLabel));
+ return sourceLabels.stream().anyMatch(clusterNodeLabels::contains);
}
+
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
index 810241f1a..f1cc29013 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongGroupOperator4Pulsar.java
@@ -18,7 +18,6 @@
package org.apache.inlong.manager.service.group;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -122,10 +121,12 @@ public class InlongGroupOperator4Pulsar extends AbstractGroupOperator {
public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
- String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
- ? InlongConstants.DEFAULT_PULSAR_TENANT
- : pulsarCluster.getTenant();
+ // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+ String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getTenant();
+ }
InlongPulsarTopicInfo topicInfo = new InlongPulsarTopicInfo();
topicInfo.setTenant(tenant);
topicInfo.setNamespace(groupInfo.getMqResource());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
index ed702cff5..4652c39d6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consume/apply/ApproveConsumeProcessListener.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.listener.consume.apply;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarDTO;
import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumeProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
@@ -87,11 +88,10 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
String mqType = entity.getMqType();
if (MQType.TUBEMQ.equals(mqType)) {
this.createTubeConsumerGroup(entity, context.getOperator());
- return ListenerResult.success("Create TubeMQ consumer group successful");
} else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
this.createPulsarSubscription(entity);
} else if (MQType.KAFKA.equals(mqType)) {
- // TODO add Kafka
+ // Kafka consumers do not need to register in advance
} else {
throw new WorkflowListenerException("Unsupported MQ type " + mqType);
}
@@ -130,17 +130,17 @@ public class ApproveConsumeProcessListener implements ProcessEventListener {
ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- PulsarTopicInfo topicMessage = new PulsarTopicInfo();
- String tenant = pulsarCluster.getTenant();
- if (StringUtils.isEmpty(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ InlongPulsarDTO pulsarDTO = InlongPulsarDTO.getFromJson(groupEntity.getExtParams());
+ String tenant = pulsarDTO.getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getTenant();
}
+ PulsarTopicInfo topicMessage = new PulsarTopicInfo();
topicMessage.setTenant(tenant);
topicMessage.setNamespace(mqResource);
- String consumerGroup = entity.getConsumerGroup();
List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
- this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
+ this.createPulsarSubscription(pulsarAdmin, entity.getConsumerGroup(), topicMessage, topics);
} catch (Exception e) {
log.error("create pulsar topic failed", e);
throw new WorkflowListenerException("failed to create pulsar topic for groupId=" + groupId + ", reason: "
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 7f3b69a8a..4b81123d4 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -21,10 +21,11 @@ import com.google.common.base.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.common.constant.MQType;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
@@ -83,22 +84,27 @@ public class PulsarResourceOperator implements QueueResourceOperator {
String clusterTag = groupInfo.getInlongClusterTag();
log.info("begin to create pulsar resource for groupId={}, clusterTag={}", groupId, clusterTag);
+ if (!(groupInfo instanceof InlongPulsarInfo)) {
+ throw new BusinessException("the mqType must be PULSAR for inlongGroupId=" + groupId);
+ }
+
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ String tenant = pulsarInfo.getTenant();
// get pulsar cluster via the inlong cluster tag from the inlong group
List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
for (ClusterInfo clusterInfo : clusterInfos) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
// create pulsar tenant and namespace
- String tenant = pulsarCluster.getTenant();
- if (StringUtils.isEmpty(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getTenant();
}
// if the group was not successful, need create tenant and namespace
if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
pulsarOperator.createTenant(pulsarAdmin, tenant);
String namespace = groupInfo.getMqResource();
- pulsarOperator.createNamespace(pulsarAdmin, (InlongPulsarInfo) groupInfo, tenant, namespace);
+ pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
log.info("success to create pulsar resource for groupId={}, tenant={}, namespace={}, cluster={}",
groupId, tenant, namespace, pulsarCluster);
@@ -131,7 +137,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
for (InlongStreamBriefInfo streamInfo : streamInfos) {
- this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+ this.deletePulsarTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
}
} catch (Exception e) {
String msg = "failed to delete pulsar resource for groupId=" + groupId;
@@ -192,7 +198,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
for (ClusterInfo clusterInfo : clusterInfos) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try {
- this.deletePulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+ this.deletePulsarTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
log.info("success to delete pulsar topic for groupId={}, streamId={}, topic={}, cluster={}",
groupId, streamId, streamInfo.getMqResource(), pulsarCluster);
} catch (Exception e) {
@@ -211,9 +217,9 @@ public class PulsarResourceOperator implements QueueResourceOperator {
private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
throws Exception {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarCluster.getTenant();
- if (StringUtils.isEmpty(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ String tenant = pulsarInfo.getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getTenant();
}
String namespace = pulsarInfo.getMqResource();
PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
@@ -233,9 +239,9 @@ public class PulsarResourceOperator implements QueueResourceOperator {
private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName,
String streamId) throws Exception {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarCluster.getTenant();
- if (StringUtils.isEmpty(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ String tenant = pulsarInfo.getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getTenant();
}
String namespace = pulsarInfo.getMqResource();
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
@@ -274,14 +280,14 @@ public class PulsarResourceOperator implements QueueResourceOperator {
* Delete Pulsar Topic and Subscription, and delete the consumer group info.
* TODO delete Subscription and InlongConsume info
*/
- private void deletePulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName)
+ private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
throws Exception {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- String tenant = pulsarCluster.getTenant();
- if (StringUtils.isEmpty(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ String tenant = pulsarInfo.getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getTenant();
}
- String namespace = groupInfo.getMqResource();
+ String namespace = pulsarInfo.getMqResource();
PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
.tenant(tenant)
.namespace(namespace)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
index a29020a0e..a20629e4e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/pulsar/PulsarSourceOperator.java
@@ -23,7 +23,6 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
@@ -33,6 +32,7 @@ import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.kafka.KafkaSource;
@@ -112,12 +112,15 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
String adminUrl = pulsarCluster.getAdminUrl();
String serviceUrl = pulsarCluster.getUrl();
- String tenant = StringUtils.isEmpty(pulsarCluster.getTenant())
- ? InlongConstants.DEFAULT_PULSAR_TENANT
- : pulsarCluster.getTenant();
+
+ // First get the tenant from the InlongGroup, and then get it from the PulsarCluster.
+ String tenant = ((InlongPulsarInfo) groupInfo).getTenant();
+ if (StringUtils.isBlank(tenant)) {
+ tenant = pulsarCluster.getTenant();
+ }
Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
- streamInfos.forEach(streamInfo -> {
+ for (InlongStreamInfo streamInfo : streamInfos) {
PulsarSource pulsarSource = new PulsarSource();
String streamId = streamInfo.getInlongStreamId();
pulsarSource.setSourceName(streamId);
@@ -166,7 +169,7 @@ public class PulsarSourceOperator extends AbstractSourceOperator {
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
pulsarSource.setFieldList(streamInfo.getFieldList());
sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
- });
+ }
return sourceMap;
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
index 4e659a055..41afde323 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceTest.java
@@ -76,7 +76,6 @@ public class InlongClusterServiceTest extends ServiceBaseTest {
request.setName(clusterName);
request.setType(ClusterType.PULSAR);
request.setAdminUrl(adminUrl);
- request.setTenant("public");
request.setInCharges(GLOBAL_OPERATOR);
return clusterService.save(request, GLOBAL_OPERATOR);
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
index 44bfe7258..3742aaaac 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceTest.java
@@ -156,8 +156,6 @@ public class InlongConsumeServiceTest extends ServiceBaseTest {
request.setType(ClusterType.PULSAR);
String adminUrl = "http://127.0.0.1:8080";
request.setAdminUrl(adminUrl);
- String tenant = "public";
- request.setTenant(tenant);
request.setInCharges(GLOBAL_OPERATOR);
clusterId = clusterService.save(request, GLOBAL_OPERATOR);
}