You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/12 15:22:27 UTC
[incubator-inlong] branch master updated: [INLONG-3089][Manager] Create group resource failed after approving one inlong group (#3092)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 264912c [INLONG-3089][Manager] Create group resource failed after approving one inlong group (#3092)
264912c is described below
commit 264912ca012b4903490c7b5e30ab6840a71e5d1e
Author: healchow <he...@gmail.com>
AuthorDate: Sat Mar 12 23:21:36 2022 +0800
[INLONG-3089][Manager] Create group resource failed after approving one inlong group (#3092)
---
.../common/pojo/dataproxy/PulsarClusterInfo.java | 4 +-
.../inlong/manager/client/api/PulsarBaseConf.java | 2 +
.../manager/client/api/TdmqPulsarBaseConf.java | 9 +++
.../pojo/consumption/ConsumptionMqExtBase.java | 7 ++-
.../dao/mapper/ThirdPartyClusterEntityMapper.java | 2 +-
.../mappers/ThirdPartyClusterEntityMapper.xml | 2 +-
.../manager/service/CommonOperateService.java | 67 +++++++++-------------
.../service/core/impl/ConsumptionServiceImpl.java | 44 +++++++-------
.../service/core/impl/InlongGroupServiceImpl.java | 34 +++++------
.../core/impl/ThirdPartyClusterServiceImpl.java | 15 ++---
.../mq/CreatePulsarGroupForStreamTaskListener.java | 2 +-
.../mq/CreatePulsarGroupTaskListener.java | 10 ++--
.../mq/CreatePulsarResourceTaskListener.java | 2 +-
.../mq/CreatePulsarTopicForStreamTaskListener.java | 2 +-
.../service/thirdparty/mq/PulsarEventSelector.java | 8 +--
.../thirdparty/sort/util/SourceInfoUtils.java | 9 +--
.../ConsumptionCompleteProcessListener.java | 10 ++--
.../stream/CreateStreamWorkflowDefinition.java | 12 ++--
18 files changed, 124 insertions(+), 117 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
index a0733e2..f03204e 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
@@ -25,13 +25,15 @@ import lombok.NoArgsConstructor;
import java.util.Map;
@Data
+@Builder
@AllArgsConstructor
@NoArgsConstructor
-@Builder
public class PulsarClusterInfo {
+
private String type;
private String adminUrl;
private String token;
private String brokerServiceUrl;
private Map<String, String> ext;
+
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
index 415d639..dc05abc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
@@ -21,11 +21,13 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.auth.Authentication;
import org.apache.inlong.manager.common.enums.MqType;
@Data
+@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("Base configuration for Pulsar")
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
index 3074794..ae8d231 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
@@ -18,10 +18,19 @@
package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.enums.MqType;
+@Data
+@EqualsAndHashCode(callSuper = true)
+@AllArgsConstructor
+@NoArgsConstructor
public class TdmqPulsarBaseConf extends PulsarBaseConf {
@ApiModelProperty("Message queue type")
private MqType type = MqType.TDMQ_PULSAR;
+
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
index 6bff8a7..0f0784f 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
@@ -32,14 +32,15 @@ import org.apache.inlong.manager.common.enums.Constant;
@ApiModel("Extended consumption information of different MQs")
@JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class)
@JsonSubTypes({
- @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR)
+ @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR),
+ @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR)
})
public class ConsumptionMqExtBase {
- @ApiModelProperty(value = "Self-incrementing primary key")
+ @ApiModelProperty(value = "Primary key")
private Integer id;
- @ApiModelProperty(value = "Consumer information ID")
+ @ApiModelProperty(value = "Consumption ID")
private Integer consumptionId;
@ApiModelProperty(value = "Consumer group")
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
index fe13d80..962782a 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/ThirdPartyClusterEntityMapper.java
@@ -39,7 +39,7 @@ public interface ThirdPartyClusterEntityMapper {
List<ThirdPartyClusterEntity> selectByType(@Param("type") String type);
- List<ThirdPartyClusterEntity> selectMqCluster(@Param("mqSetName") String mqSetName,
+ List<ThirdPartyClusterEntity> selectMQCluster(@Param("mqSetName") String mqSetName,
@Param("typeList") List<String> typeList);
ThirdPartyClusterEntity selectByName(@Param("name") String name);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
index f67ae62..a74ecb8 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/ThirdPartyClusterEntityMapper.xml
@@ -225,7 +225,7 @@
where is_deleted = 0
and name = #{name, jdbcType=VARCHAR}
</select>
- <select id="selectMqCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
+ <select id="selectMQCluster" resultType="org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity">
select
<include refid="Base_Column_List"/>
from third_party_cluster
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index b171f90..baec6e7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
@@ -28,19 +29,15 @@ import org.apache.inlong.manager.common.enums.GroupState;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupPageRequest;
import org.apache.inlong.manager.common.pojo.sink.SinkResponse;
import org.apache.inlong.manager.common.pojo.sink.hive.HiveSinkResponse;
import org.apache.inlong.manager.common.pojo.source.SourceResponse;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.util.JsonUtils;
import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.DataProxyClusterEntity;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.ThirdPartyClusterEntity;
-import org.apache.inlong.manager.dao.mapper.DataProxyClusterEntityMapper;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.dao.mapper.ThirdPartyClusterEntityMapper;
import org.apache.inlong.manager.service.core.InlongStreamService;
@@ -61,6 +58,7 @@ import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -82,8 +80,6 @@ public class CommonOperateService {
@Autowired
private InlongGroupEntityMapper groupMapper;
@Autowired
- private DataProxyClusterEntityMapper dataProxyClusterMapper;
- @Autowired
private ThirdPartyClusterEntityMapper thirdPartyClusterMapper;
/**
@@ -100,14 +96,14 @@ public class CommonOperateService {
switch (key) {
case Constant.PULSAR_SERVICEURL: {
- clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
+ clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
if (clusterEntity != null) {
result = clusterEntity.getUrl();
}
break;
}
case Constant.PULSAR_ADMINURL: {
- clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
+ clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
if (clusterEntity != null) {
params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
result = params.get(key);
@@ -117,7 +113,7 @@ public class CommonOperateService {
case Constant.CLUSTER_TUBE_MANAGER:
case Constant.CLUSTER_TUBE_CLUSTER_ID:
case Constant.TUBE_MASTER_URL: {
- clusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_TUBE);
+ clusterEntity = getMQCluster(Constant.MIDDLEWARE_TUBE);
if (clusterEntity != null) {
if (key.equals(Constant.TUBE_MASTER_URL)) {
result = clusterEntity.getUrl();
@@ -129,61 +125,50 @@ public class CommonOperateService {
break;
}
}
-
return result;
}
/**
* Get third party cluster by type.
*
- * TODO Add more condition for query.
+ * TODO Add data_proxy_cluster_name for query.
*
* @param type Cluster type, such as TUBE, PULSAR, etc.
*/
- private ThirdPartyClusterEntity getThirdPartyCluster(String type) {
- InlongGroupPageRequest groupPageRequest = new InlongGroupPageRequest();
- groupPageRequest.setMiddlewareType(type);
- List<InlongGroupEntity> groupEntities = groupMapper.selectByCondition(groupPageRequest);
- if (groupEntities.isEmpty()) {
- LOGGER.warn("no inlong group found by type={}", type);
+ private ThirdPartyClusterEntity getMQCluster(String type) {
+ List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
+ if (CollectionUtils.isEmpty(clusterList)) {
+ LOGGER.warn("no data proxy cluster found");
return null;
}
-
- Integer clusterId = groupEntities.get(0).getProxyClusterId();
- DataProxyClusterEntity dataProxyCluster = dataProxyClusterMapper.selectByPrimaryKey(clusterId);
- if (dataProxyCluster == null) {
- LOGGER.warn("no data proxy cluster found with id={}", clusterId);
+ String mqSetName = clusterList.get(0).getMqSetName();
+ List<ThirdPartyClusterEntity> mqClusterList = thirdPartyClusterMapper.selectMQCluster(mqSetName,
+ Collections.singletonList(type));
+ if (CollectionUtils.isEmpty(mqClusterList)) {
+ LOGGER.warn("no mq cluster found by type={} and mq set name={}", type, mqSetName);
return null;
}
- String mqSetName = dataProxyCluster.getMqSetName();
- ClusterPageRequest clusterRequest = new ClusterPageRequest();
- clusterRequest.setMqSetName(mqSetName);
- List<ThirdPartyClusterEntity> thirdPartyClusters = thirdPartyClusterMapper.selectByCondition(clusterRequest);
- if (CollectionUtils.isEmpty(thirdPartyClusters)) {
- LOGGER.warn("no related third-party-cluster by type={} and mq set name={}", type, mqSetName);
- return null;
- }
-
- return thirdPartyClusters.get(0);
+ return mqClusterList.get(0);
}
/**
- * Get Pulsar cluster info.
+ * Get Pulsar cluster by the given type.
*
* @return Pulsar cluster info.
*/
- public PulsarClusterInfo getPulsarClusterInfo() {
- ThirdPartyClusterEntity thirdPartyClusterEntity = getThirdPartyCluster(Constant.MIDDLEWARE_PULSAR);
- Preconditions.checkNotNull(thirdPartyClusterEntity.getExtParams(), "pulsar extParam is empty, check"
- + "third party cluster table");
- Map<String, String> configParams = JsonUtils.parse(thirdPartyClusterEntity.getExtParams(), Map.class);
+ public PulsarClusterInfo getPulsarClusterInfo(String type) {
+ ThirdPartyClusterEntity clusterEntity = getMQCluster(type);
+ if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) {
+ throw new BusinessException("pulsar cluster or pulsar ext params is empty");
+ }
+ Map<String, String> configParams = JsonUtils.parse(clusterEntity.getExtParams(), Map.class);
PulsarClusterInfo pulsarClusterInfo = PulsarClusterInfo.builder().brokerServiceUrl(
- thirdPartyClusterEntity.getUrl()).token(thirdPartyClusterEntity.getToken()).build();
+ clusterEntity.getUrl()).token(clusterEntity.getToken()).build();
String adminUrl = configParams.get(Constant.PULSAR_ADMINURL);
Preconditions.checkNotNull(adminUrl, "adminUrl is empty, check third party cluster table");
pulsarClusterInfo.setAdminUrl(adminUrl);
- pulsarClusterInfo.setType(thirdPartyClusterEntity.getType());
+ pulsarClusterInfo.setType(clusterEntity.getType());
return pulsarClusterInfo;
}
@@ -241,7 +226,7 @@ public class CommonOperateService {
// Get source info
String masterAddress = getSpecifiedParam(Constant.TUBE_MASTER_URL);
- PulsarClusterInfo pulsarCluster = getPulsarClusterInfo();
+ PulsarClusterInfo pulsarCluster = getPulsarClusterInfo(groupInfo.getMiddlewareType());
InlongStreamInfo streamInfo = streamService.get(groupId, streamId);
SourceInfo sourceInfo = SourceInfoUtils.createSourceInfo(pulsarCluster, masterAddress, clusterBean,
groupInfo, streamInfo, sourceResponse, sourceFields);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 72cdf7d..fc8b1d8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -20,15 +20,6 @@ package org.apache.inlong.manager.service.core.impl;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
@@ -66,6 +57,16 @@ import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
/**
* Data consumption service
*/
@@ -119,7 +120,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
- if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+ String mqType = info.getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
@@ -147,11 +149,10 @@ public class ConsumptionServiceImpl implements ConsumptionService {
@Transactional(rollbackFor = Throwable.class)
public Integer save(ConsumptionInfo info, String operator) {
fullConsumptionInfo(info);
-
Date now = new Date();
ConsumptionEntity entity = this.saveConsumption(info, operator, now);
-
- if (Constant.MIDDLEWARE_PULSAR.equals(entity.getMiddlewareType())) {
+ String mqType = entity.getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
savePulsarInfo(info.getMqExtInfo(), entity);
}
@@ -232,7 +233,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
entity.setModifyTime(now);
// Modify Pulsar consumption info
- if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(info.getMiddlewareType())) {
+ String mqType = info.getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
@@ -338,10 +340,10 @@ public class ConsumptionServiceImpl implements ConsumptionService {
}
log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
- String middlewareType = bizInfo.getMiddlewareType();
+ String mqType = bizInfo.getMiddlewareType();
ConsumptionEntity entity = new ConsumptionEntity();
entity.setInlongGroupId(groupId);
- entity.setMiddlewareType(middlewareType);
+ entity.setMiddlewareType(mqType);
entity.setTopic(topic);
entity.setConsumerGroupId(consumerGroup);
entity.setConsumerGroupName(consumerGroup);
@@ -355,7 +357,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
consumptionMapper.insert(entity);
- if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
pulsarEntity.setConsumptionId(entity.getId());
pulsarEntity.setConsumerGroupId(consumerGroup);
@@ -370,7 +372,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
NewConsumptionProcessForm form = new NewConsumptionProcessForm();
Integer id = consumptionInfo.getId();
- if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(consumptionInfo.getMiddlewareType())) {
+ String mqType = consumptionInfo.getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
ConsumptionPulsarInfo::new);
@@ -426,11 +429,12 @@ public class ConsumptionServiceImpl implements ConsumptionService {
Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId);
// Tube’s topic is the inlong group level, one inlong group, one Tube topic
- if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+ String mqType = topicVO.getMiddlewareType();
+ if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
String bizTopic = topicVO.getMqResourceObj();
Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
"topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
- } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(topicVO.getMiddlewareType())) {
+ } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
// Pulsar's topic is the inlong stream level.
// There will be multiple inlong streams under one inlong group, and there will be multiple topics
List<InlongStreamTopicResponse> dsTopicList = topicVO.getDsTopicList();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index d55a3c4..cff6760 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -117,7 +117,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
groupMapper.insertSelective(entity);
this.saveOrUpdateExt(groupId, groupInfo.getExtList());
- if (Constant.MIDDLEWARE_PULSAR.equals(groupInfo.getMiddlewareType())) {
+ String mqType = groupInfo.getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
@@ -172,23 +173,23 @@ public class InlongGroupServiceImpl implements InlongGroupService {
groupInfo.setExtList(extInfoList);
// If the middleware is Pulsar, we need to encapsulate Pulsar related data
- String middlewareType = entity.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ String mqType = entity.getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
InlongGroupPulsarEntity pulsarEntity = groupPulsarMapper.selectByGroupId(groupId);
- Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found under the inlong group");
+ Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found by the groupId=" + groupId);
InlongGroupPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, InlongGroupPulsarInfo::new);
- pulsarInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ pulsarInfo.setMiddlewareType(mqType);
groupInfo.setMqExtInfo(pulsarInfo);
}
// For approved inlong group, encapsulate the cluster address of the middleware
if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode(groupInfo.getStatus())) {
- if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+ if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(mqType)) {
groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
- } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
- PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo();
- groupInfo.setPulsarAdminUrl(pulsarClusterInfo.getAdminUrl());
- groupInfo.setPulsarServiceUrl(pulsarClusterInfo.getBrokerServiceUrl());
+ } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType);
+ groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl());
+ groupInfo.setPulsarServiceUrl(pulsarCluster.getBrokerServiceUrl());
}
}
@@ -240,7 +241,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
this.saveOrUpdateExt(groupId, groupRequest.getExtList());
// Update the Pulsar info
- if (Constant.MIDDLEWARE_PULSAR.equals(groupRequest.getMiddlewareType())) {
+ String mqType = groupRequest.getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupRequest.getMqExtInfo();
Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
Integer writeQuorum = pulsarInfo.getWriteQuorum();
@@ -401,25 +403,25 @@ public class InlongGroupServiceImpl implements InlongGroupService {
LOGGER.debug("begin to get topic by groupId={}", groupId);
InlongGroupInfo groupInfo = this.get(groupId);
- String middlewareType = groupInfo.getMiddlewareType();
+ String mqType = groupInfo.getMiddlewareType();
InlongGroupTopicResponse topicVO = new InlongGroupTopicResponse();
- if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+ if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
// Tube Topic corresponds to inlong group one-to-one
topicVO.setMqResourceObj(groupInfo.getMqResourceObj());
topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
- } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
// Pulsar's topic corresponds to the inlong stream one-to-one
topicVO.setDsTopicList(streamService.getTopicList(groupId));
topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
topicVO.setPulsarServiceUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_SERVICEURL));
} else {
- LOGGER.error("middleware type={} not supported", middlewareType);
+ LOGGER.error("middleware type={} not supported", mqType);
throw new BusinessException(ErrorCodeEnum.MIDDLEWARE_TYPE_NOT_SUPPORTED);
}
topicVO.setInlongGroupId(groupId);
- topicVO.setMiddlewareType(middlewareType);
+ topicVO.setMiddlewareType(mqType);
return topicVO;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index b7e5cb7..2247b68 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -211,10 +211,11 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
DataProxyConfig config = new DataProxyConfig();
config.setM(groupEntity.getSchemaName());
- if (Constant.MIDDLEWARE_TUBE.equals(groupEntity.getMiddlewareType())) {
+ String mqType = groupEntity.getMiddlewareType();
+ if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
config.setInlongGroupId(groupId);
config.setTopic(bizResource);
- } else if (Constant.MIDDLEWARE_PULSAR.equals(groupEntity.getMiddlewareType())) {
+ } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
for (InlongStreamEntity stream : streamList) {
String topic = stream.getMqResourceObj();
@@ -248,9 +249,9 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
}
// third-party-cluster type
- String middlewareType = "";
+ String mqType = "";
if (!groupEntityList.isEmpty()) {
- middlewareType = groupEntityList.get(0).getMiddlewareType();
+ mqType = groupEntityList.get(0).getMiddlewareType();
}
// Get topic list by group id
@@ -258,7 +259,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
for (InlongGroupEntity groupEntity : groupEntityList) {
final String groupId = groupEntity.getInlongGroupId();
final String mqResource = groupEntity.getMqResourceObj();
- if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType)) {
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
for (InlongStreamEntity stream : streamList) {
DataProxyConfig topicConfig = new DataProxyConfig();
@@ -273,7 +274,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
topicList.add(topicConfig);
}
- } else if (Constant.MIDDLEWARE_TUBE.equals(middlewareType)) {
+ } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
DataProxyConfig topicConfig = new DataProxyConfig();
topicConfig.setInlongGroupId(groupId);
topicConfig.setTopic(mqResource);
@@ -285,7 +286,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
List<ThirdPartyClusterInfo> mqSet = new ArrayList<>();
List<String> clusterType = Arrays.asList(Constant.CLUSTER_TUBE, Constant.CLUSTER_PULSAR,
Constant.CLUSTER_TDMQ_PULSAR);
- List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectMqCluster(
+ List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectMQCluster(
clusterEntity.getMqSetName(), clusterType);
for (ThirdPartyClusterEntity cluster : clusterList) {
ThirdPartyClusterInfo clusterInfo = new ThirdPartyClusterInfo();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
index a814379..d4ec8fb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupForStreamTaskListener.java
@@ -86,7 +86,7 @@ public class CreatePulsarGroupForStreamTaskListener implements QueueOperateListe
log.warn("inlong stream is empty for group={}, stream={}, skip to create pulsar group", groupId, streamId);
return ListenerResult.success();
}
- PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
// Query data sink info based on groupId and streamId
List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
index 304dc78..c22342d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarGroupTaskListener.java
@@ -70,8 +70,8 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
- InlongGroupInfo bizInfo = groupService.get(groupId);
- if (bizInfo == null) {
+ InlongGroupInfo groupInfo = groupService.get(groupId);
+ if (groupInfo == null) {
log.error("inlong group not found with groupId={}", groupId);
throw new WorkflowListenerException("inlong group not found with groupId=" + groupId);
}
@@ -82,10 +82,10 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
return ListenerResult.success();
}
- PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
String tenant = clusterBean.getDefaultTenant();
- String namespace = bizInfo.getMqResourceObj();
+ String namespace = groupInfo.getMqResourceObj();
for (InlongStreamEntity streamEntity : streamEntities) {
PulsarTopicBean topicBean = new PulsarTopicBean();
@@ -114,7 +114,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
// Insert the consumption data into the consumption table
- consumptionService.saveSortConsumption(bizInfo, topic, subscription);
+ consumptionService.saveSortConsumption(groupInfo, topic, subscription);
}
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
index 9cb3d88..b0b8827 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarResourceTaskListener.java
@@ -77,7 +77,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
if (groupInfo == null) {
throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
}
- PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
for (String cluster : pulsarClusters) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
index 3864ac2..2811b5f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/CreatePulsarTopicForStreamTaskListener.java
@@ -76,7 +76,7 @@ public class CreatePulsarTopicForStreamTaskListener implements QueueOperateListe
}
log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
- PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMiddlewareType());
try (PulsarAdmin globalPulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
List<String> pulsarClusters = PulsarUtils.getPulsarClusters(globalPulsarAdmin);
for (String cluster : pulsarClusters) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
index 5da60b8..0bf990c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
@@ -21,8 +21,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
+import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.EventSelector;
@Slf4j
@@ -35,13 +35,13 @@ public class PulsarEventSelector implements EventSelector {
return false;
}
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- String middlewareType = form.getGroupInfo().getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ String mqType = form.getGroupInfo().getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) form.getGroupInfo().getMqExtInfo();
return pulsarInfo.getEnableCreateResource() == 1;
}
log.warn("no need to create pulsar subscription group for groupId={}, as the middlewareType={}",
- form.getInlongGroupId(), middlewareType);
+ form.getInlongGroupId(), mqType);
return false;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 2df1936..13a74c8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -64,17 +64,17 @@ public class SourceInfoUtils {
ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
SourceResponse sourceResponse, List<FieldInfo> sourceFields) {
- String middleWareType = groupInfo.getMiddlewareType();
+ String mqType = groupInfo.getMiddlewareType();
DeserializationInfo deserializationInfo = SerializationUtils.createDeserialInfo(sourceResponse, streamInfo);
SourceInfo sourceInfo;
- if (Constant.MIDDLEWARE_PULSAR.equals(middleWareType)) {
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
sourceInfo = createPulsarSourceInfo(pulsarCluster, clusterBean, groupInfo, streamInfo, deserializationInfo,
sourceFields);
- } else if (Constant.MIDDLEWARE_TUBE.equals(middleWareType)) {
+ } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
// InlongGroupInfo groupInfo, String masterAddress,
sourceInfo = createTubeSourceInfo(groupInfo, masterAddress, clusterBean, deserializationInfo, sourceFields);
} else {
- throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", middleWareType));
+ throw new WorkflowListenerException(String.format("Unsupported middleware {%s}", mqType));
}
return sourceInfo;
@@ -98,6 +98,7 @@ public class SourceInfoUtils {
final String fullTopicName = "persistent://" + tenant + "/" + namespace + "/" + topicName;
final String consumerGroup = clusterBean.getAppName() + "_" + topicName + "_consumer_group";
FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
+
String type = pulsarCluster.getType();
if (StringUtils.isNotEmpty(type) && Constant.MIDDLEWARE_TDMQ_PULSAR.equals(type)) {
return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 1e0d757..7f0b49d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -84,14 +84,14 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
}
- String middlewareType = entity.getMiddlewareType();
- if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(middlewareType)) {
+ String mqType = entity.getMiddlewareType();
+ if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
this.createTubeConsumerGroup(entity);
return ListenerResult.success("Create Tube consumer group successful");
- } else if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
this.createPulsarTopicMessage(entity);
} else {
- throw new WorkflowListenerException("middleware type [" + middlewareType + "] not supported");
+ throw new WorkflowListenerException("middleware type [" + mqType + "] not supported");
}
this.updateConsumerInfo(consumptionId, entity.getConsumerGroupId());
@@ -119,7 +119,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
String mqResourceObj = groupInfo.getMqResourceObj();
Preconditions.checkNotNull(mqResourceObj, "mq resource cannot empty for groupId=" + groupId);
- PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo();
+ PulsarClusterInfo globalCluster = commonOperateService.getPulsarClusterInfo(entity.getMiddlewareType());
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(globalCluster)) {
PulsarTopicBean topicMessage = new PulsarTopicBean();
String tenant = clusterBean.getDefaultTenant();
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index c10c80d..8e879ec 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -89,12 +89,12 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask createPulsarTopicTask = new ServiceTask();
createPulsarTopicTask.setSkipResolver(c -> {
GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
- String middlewareType = form.getGroupInfo().getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ String mqType = form.getGroupInfo().getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
return false;
}
log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}",
- form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+ form.getInlongGroupId(), form.getInlongStreamId(), mqType);
return true;
});
createPulsarTopicTask.setName("createPulsarTopic");
@@ -105,12 +105,12 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask();
createPulsarSubscriptionGroupTask.setSkipResolver(c -> {
GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
- String middlewareType = form.getGroupInfo().getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equalsIgnoreCase(middlewareType)) {
+ String mqType = form.getGroupInfo().getMiddlewareType();
+ if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
return false;
}
log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}",
- form.getInlongGroupId(), form.getInlongStreamId(), middlewareType);
+ form.getInlongGroupId(), form.getInlongStreamId(), mqType);
return true;
});
createPulsarSubscriptionGroupTask.setName("createPulsarSubscription");