You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/09/06 03:42:19 UTC
[inlong] branch master updated: [INLONG-5781][Manager] Support creating consumer groups for all sinks (#5782)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9844199f6 [INLONG-5781][Manager] Support creating consumer groups for all sinks (#5782)
9844199f6 is described below
commit 9844199f601582438daa470751a2beb37bd7f512
Author: healchow <he...@gmail.com>
AuthorDate: Tue Sep 6 11:42:13 2022 +0800
[INLONG-5781][Manager] Support creating consumer groups for all sinks (#5782)
---
.../dao/mapper/InlongConsumeEntityMapper.java | 4 +
.../mappers/InlongConsumeEntityMapper.xml | 10 ++
.../{PulsarTopicBean.java => PulsarTopicInfo.java} | 2 +-
.../service/consume/InlongConsumeService.java | 8 ++
.../service/consume/InlongConsumeServiceImpl.java | 32 ++++++
.../manager/service/core/ConsumptionService.java | 6 --
.../service/core/impl/ConsumptionServiceImpl.java | 40 --------
.../ConsumptionCompleteProcessListener.java | 12 +--
.../resource/queue/kafka/KafkaOperator.java | 2 -
.../queue/kafka/KafkaResourceOperators.java | 37 ++++---
.../resource/queue/pulsar/PulsarOperator.java | 110 ++++++++++-----------
.../queue/pulsar/PulsarResourceOperator.java | 103 ++++++++++++-------
.../queue/tubemq/TubeMQResourceOperator.java | 13 +--
13 files changed, 212 insertions(+), 167 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
index 3b0205048..feaf28a50 100644
--- a/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
+++ b/inlong-manager/manager-dao/src/main/java/org/apache/inlong/manager/dao/mapper/InlongConsumeEntityMapper.java
@@ -36,6 +36,10 @@ public interface InlongConsumeEntityMapper {
List<InlongConsumeEntity> selectByCondition(InlongConsumePageRequest request);
+ InlongConsumeEntity selectExists(@Param("groupId") String groupId,
+ @Param("topic") String topic,
+ @Param("consumerGroup") String consumerGroup);
+
int updateById(InlongConsumeEntity record);
int updateByIdSelective(InlongConsumeEntity record);
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
index 1db846b23..16e1d9b52 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/InlongConsumeEntityMapper.xml
@@ -118,6 +118,16 @@
</otherwise>
</choose>
</select>
+ <select id="selectExists" resultType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
+ select
+ <include refid="Base_Column_List"/>
+ from inlong_consume
+ where is_deleted = 0
+ and inlong_group_id = #{inlongGroupId, jdbcType=VARCHAR}
+ and topic = #{topic, jdbcType=VARCHAR}
+ and consumer_group = #{consumerGroup, jdbcType=VARCHAR}
+ limit 1
+ </select>
<update id="updateById" parameterType="org.apache.inlong.manager.dao.entity.InlongConsumeEntity">
update inlong_consume
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicBean.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
similarity index 97%
rename from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicBean.java
rename to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
index 91dc7937d..29a8f4efb 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicBean.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/queue/pulsar/PulsarTopicInfo.java
@@ -29,7 +29,7 @@ import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class PulsarTopicBean {
+public class PulsarTopicInfo {
private String tenant;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
index ce421e337..c31fab822 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeService.java
@@ -23,6 +23,7 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
@@ -41,6 +42,13 @@ public interface InlongConsumeService {
*/
Integer save(InlongConsumeRequest request, String operator);
+ /**
+ * Save the consumer group by InLong system, and not start the workflow process.
+ *
+ * @return inlong consume id after saving
+ */
+ Integer saveBySystem(InlongGroupInfo groupInfo, String topic, String consumerGroup);
+
/**
* Get inlong consume info based on ID
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
index c7977b500..f0c5cec8f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/consume/InlongConsumeServiceImpl.java
@@ -36,6 +36,7 @@ import org.apache.inlong.manager.pojo.consume.InlongConsumeCountInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumeInfo;
import org.apache.inlong.manager.pojo.consume.InlongConsumePageRequest;
import org.apache.inlong.manager.pojo.consume.InlongConsumeRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -58,6 +59,7 @@ import static org.apache.inlong.manager.pojo.common.PageRequest.MAX_PAGE_SIZE;
public class InlongConsumeServiceImpl implements InlongConsumeService {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongConsumeServiceImpl.class);
+ private static final String AUTO_CREATE_MSG = "auto create by inlong";
@Autowired
private InlongConsumeEntityMapper consumeMapper;
@@ -82,6 +84,36 @@ public class InlongConsumeServiceImpl implements InlongConsumeService {
return request.getId();
}
+ @Override
+ public Integer saveBySystem(InlongGroupInfo groupInfo, String topic, String consumerGroup) {
+ String groupId = groupInfo.getInlongGroupId();
+ InlongConsumeEntity existEntity = consumeMapper.selectExists(groupId, topic, consumerGroup);
+ if (existEntity != null) {
+ LOGGER.warn("inlong consume already exists for groupId={} topic={} consumerGroup={}, skip to create",
+ groupId, topic, consumerGroup);
+ return existEntity.getId();
+ }
+
+ LOGGER.debug("begin to save inlong consume for groupId={} topic={} group={}", groupId, topic, consumerGroup);
+ InlongConsumeEntity entity = new InlongConsumeEntity();
+ entity.setConsumerGroup(consumerGroup);
+ entity.setDescription(AUTO_CREATE_MSG);
+ entity.setMqType(groupInfo.getMqType());
+ entity.setTopic(topic);
+ entity.setInlongGroupId(groupId);
+ entity.setFilterEnabled(0);
+
+ entity.setInCharges(groupInfo.getInCharges());
+ entity.setStatus(ConsumeStatus.APPROVED.getCode());
+ String operator = groupInfo.getCreator();
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+
+ consumeMapper.insert(entity);
+ LOGGER.debug("success save inlong consume for groupId={} topic={} group={}", groupId, topic, consumerGroup);
+ return entity.getId();
+ }
+
@Override
public boolean consumerGroupExists(String consumerGroup, Integer excludeSelfId) {
InlongConsumePageRequest request = InlongConsumePageRequest.builder()
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
index 489c8d1c0..5a181fd97 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/ConsumptionService.java
@@ -22,7 +22,6 @@ import org.apache.inlong.manager.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionListVo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
/**
* Data consumption interface
@@ -87,9 +86,4 @@ public interface ConsumptionService {
*/
Boolean delete(Integer id, String operator);
- /**
- * Save the consumer group info for Sort to the database
- */
- void saveSortConsumption(InlongGroupInfo bizInfo, String topic, String consumerGroup);
-
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 5641a293c..0de16b5c6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -43,7 +43,6 @@ import org.apache.inlong.manager.pojo.consumption.ConsumptionMqExtBase;
import org.apache.inlong.manager.pojo.consumption.ConsumptionPulsarInfo;
import org.apache.inlong.manager.pojo.consumption.ConsumptionQuery;
import org.apache.inlong.manager.pojo.consumption.ConsumptionSummary;
-import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
@@ -336,45 +335,6 @@ public class ConsumptionServiceImpl implements ConsumptionService {
return true;
}
- @Override
- public void saveSortConsumption(InlongGroupInfo groupInfo, String topic, String consumerGroup) {
- String groupId = groupInfo.getInlongGroupId();
- ConsumptionEntity exists = consumptionMapper.selectConsumptionExists(groupId, topic, consumerGroup);
- if (exists != null) {
- log.warn("consumption with groupId={}, topic={}, consumer group={} already exists, skip to create",
- groupId, topic, consumerGroup);
- return;
- }
-
- log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
- String mqType = groupInfo.getMqType();
- ConsumptionEntity entity = new ConsumptionEntity();
- entity.setInlongGroupId(groupId);
- entity.setMqType(mqType);
- entity.setTopic(topic);
- entity.setConsumerGroup(consumerGroup);
- entity.setInCharges(groupInfo.getInCharges());
- entity.setFilterEnabled(0);
-
- entity.setStatus(ConsumeStatus.APPROVED.getCode());
- String operator = groupInfo.getCreator();
- entity.setCreator(operator);
- entity.setModifier(operator);
-
- consumptionMapper.insert(entity);
-
- if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
- ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
- pulsarEntity.setConsumptionId(entity.getId());
- pulsarEntity.setConsumerGroup(consumerGroup);
- pulsarEntity.setInlongGroupId(groupId);
- pulsarEntity.setIsDeleted(InlongConstants.UN_DELETED);
- consumptionPulsarMapper.insert(pulsarEntity);
- }
-
- log.debug("success save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
- }
-
private ConsumptionEntity saveConsumption(ConsumptionInfo info, String operator) {
ConsumptionEntity entity = CommonBeanUtils.copyProperties(info, ConsumptionEntity::new);
entity.setStatus(ConsumeStatus.WAIT_ASSIGN.getCode());
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
index 3b7ac58ad..7f5478445 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
@@ -20,10 +20,10 @@ package org.apache.inlong.manager.service.listener.consumption;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ConsumeStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
-import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.pojo.workflow.form.process.ApplyConsumptionProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator;
@@ -131,7 +131,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- PulsarTopicBean topicMessage = new PulsarTopicBean();
+ PulsarTopicInfo topicMessage = new PulsarTopicInfo();
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
@@ -140,7 +140,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
topicMessage.setNamespace(mqResource);
String consumerGroup = entity.getConsumerGroup();
- List<String> topics = Arrays.asList(entity.getTopic().split(","));
+ List<String> topics = Arrays.asList(entity.getTopic().split(InlongConstants.COMMA));
this.createPulsarSubscription(pulsarAdmin, consumerGroup, topicMessage, topics);
} catch (Exception e) {
log.error("create pulsar topic failed", e);
@@ -149,10 +149,10 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
}
}
- private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+ private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
List<String> topics) {
try {
- pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
+ pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicInfo, topics);
} catch (Exception e) {
log.error("create pulsar consumer group failed", e);
throw new WorkflowListenerException("failed to create pulsar consumer group");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
index 3d3a7e063..7e886cf95 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -79,9 +79,7 @@ public class KafkaOperator {
public void createSubscription(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
String subscription) {
-
KafkaConsumer kafkaConsumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo);
- // subscription
kafkaConsumer.subscribe(Collections.singletonList(subscription));
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
index 7f06633bd..37a1ee66d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -17,9 +17,6 @@
package org.apache.inlong.manager.service.resource.queue.kafka;
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
@@ -32,12 +29,14 @@ import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
import java.util.List;
/**
@@ -47,6 +46,11 @@ import java.util.List;
@Service
public class KafkaResourceOperators implements QueueResourceOperator {
+ /**
+ * The name rule for Kafka consumer group: clusterTag_topicName_consumer_group
+ */
+ private static final String KAFKA_CONSUMER_GROUP = "%s_%s_consumer_group";
+
@Autowired
private InlongClusterService clusterService;
@Autowired
@@ -54,7 +58,7 @@ public class KafkaResourceOperators implements QueueResourceOperator {
@Autowired
private KafkaOperator kafkaOperator;
@Autowired
- private ConsumptionService consumptionService;
+ private InlongConsumeService consumeService;
@Override
public boolean accept(String mqType) {
@@ -155,12 +159,11 @@ public class KafkaResourceOperators implements QueueResourceOperator {
/**
* Create Kafka Topic and Subscription, and save the consumer group info.
*/
- private void createKafkaTopic(InlongKafkaInfo inlongKafkaInfo, String streamId)
- throws Exception {
+ private void createKafkaTopic(InlongKafkaInfo kafkaInfo, String streamId) throws Exception {
// 1. create kafka topic
- ClusterInfo clusterInfo = clusterService.getOne(inlongKafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
- String topicName = inlongKafkaInfo.getInlongGroupId() + "_" + streamId;
- kafkaOperator.createTopic(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+ ClusterInfo clusterInfo = clusterService.getOne(kafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+ String topicName = kafkaInfo.getInlongGroupId() + "_" + streamId;
+ kafkaOperator.createTopic(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
boolean exist = kafkaOperator.topicIsExists((KafkaClusterInfo) clusterInfo, topicName);
if (!exist) {
@@ -170,14 +173,16 @@ public class KafkaResourceOperators implements QueueResourceOperator {
}
// 2. create a subscription for the kafka topic
- kafkaOperator.createSubscription(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
- String groupId = inlongKafkaInfo.getInlongGroupId();
- log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}",
+ kafkaOperator.createSubscription(kafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+ String groupId = kafkaInfo.getInlongGroupId();
+ log.info("success to create kafka subscription for groupId={}, topic={}, consumeGroup={}",
groupId, topicName, topicName);
- // 3. insert the consumer group info into the consumption table
- consumptionService.saveSortConsumption(inlongKafkaInfo, topicName, topicName);
- log.info("success to save consume for groupId={}, topic={}, subs={}", groupId, topicName, topicName);
+ // 3. insert the consumer group info
+ String consumeGroup = String.format(KAFKA_CONSUMER_GROUP, kafkaInfo.getInlongClusterTag(), topicName);
+ Integer id = consumeService.saveBySystem(kafkaInfo, topicName, consumeGroup);
+ log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
+ id, consumeGroup, groupId, topicName);
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 9bfe2605b..3ebeeca37 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -24,7 +24,7 @@ import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -92,7 +92,7 @@ public class PulsarOperator {
LOGGER.info("begin to create namespace={}", namespaceName);
try {
// Check whether the namespace exists, and create it if it does not exist
- boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
+ boolean isExists = this.namespaceExists(pulsarAdmin, tenant, namespace);
if (isExists) {
LOGGER.warn("namespace={} already exists, skip to create", namespaceName);
return;
@@ -138,41 +138,41 @@ public class PulsarOperator {
/**
* Create Pulsar topic
*/
- public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
- Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
- String tenant = topicBean.getTenant();
- String namespace = topicBean.getNamespace();
- String topic = topicBean.getTopicName();
- String topicFullName = tenant + "/" + namespace + "/" + topic;
+ public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+ Preconditions.checkNotNull(topicInfo, "pulsar topic info cannot be empty");
+ String tenant = topicInfo.getTenant();
+ String namespace = topicInfo.getNamespace();
+ String topicName = topicInfo.getTopicName();
+ String fullTopicName = tenant + "/" + namespace + "/" + topicName;
// Topic will be returned if it exists, and created if it does not exist
- if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
- InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
- LOGGER.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
+ if (topicExists(pulsarAdmin, tenant, namespace, topicName,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
+ LOGGER.warn("pulsar topic={} already exists in {}", fullTopicName, pulsarAdmin.getServiceUrl());
return;
}
try {
- if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
- pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
- String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
- LOGGER.info("success to create topic={}, lookup result is {}", topicFullName, res);
+ if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicInfo.getQueueModule())) {
+ pulsarAdmin.topics().createNonPartitionedTopic(fullTopicName);
+ String res = pulsarAdmin.lookups().lookupTopic(fullTopicName);
+ LOGGER.info("success to create topic={}, lookup result is {}", fullTopicName, res);
} else {
// The number of brokers as the default value of topic partition
List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
- Integer numPartitions = topicBean.getNumPartitions();
+ Integer numPartitions = topicInfo.getNumPartitions();
if (numPartitions < 0 || numPartitions <= MAX_PARTITION) {
List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
numPartitions = brokers.size();
}
- pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
- Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+ pulsarAdmin.topics().createPartitionedTopic(fullTopicName, numPartitions);
+ Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
// if lookup failed (res.size not equals the partition number)
if (res.keySet().size() != numPartitions) {
// look up partition failed, retry to get partition numbers
for (int i = 0; (i < RETRY_TIMES && res.keySet().size() != numPartitions); i++) {
- res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+ res = pulsarAdmin.lookups().lookupPartitionedTopic(fullTopicName);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
@@ -183,10 +183,10 @@ public class PulsarOperator {
if (numPartitions != res.keySet().size()) {
throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
}
- LOGGER.info("success to create topic={}", topicFullName);
+ LOGGER.info("success to create topic={}", fullTopicName);
}
} catch (PulsarAdminException e) {
- LOGGER.error("failed to create topic=" + topicFullName, e);
+ LOGGER.error("failed to create topic=" + fullTopicName, e);
throw e;
}
}
@@ -194,26 +194,26 @@ public class PulsarOperator {
/**
* Force delete Pulsar topic
*/
- public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
- Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
+ public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicInfo topicInfo) throws PulsarAdminException {
+ Preconditions.checkNotNull(topicInfo, "pulsar topic info cannot be empty");
- String tenant = topicBean.getTenant();
- String namespace = topicBean.getNamespace();
- String topic = topicBean.getTopicName();
- String topicFullName = tenant + "/" + namespace + "/" + topic;
+ String tenant = topicInfo.getTenant();
+ String namespace = topicInfo.getNamespace();
+ String topic = topicInfo.getTopicName();
+ String fullTopicName = tenant + "/" + namespace + "/" + topic;
// Topic will be returned if it not exists
- if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
- InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
- LOGGER.warn("pulsar topic={} already delete", topicFullName);
+ if (topicExists(pulsarAdmin, tenant, namespace, topic,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicInfo.getQueueModule()))) {
+ LOGGER.warn("pulsar topic={} already delete", fullTopicName);
return;
}
try {
- pulsarAdmin.topics().delete(topicFullName, true);
- LOGGER.info("success to delete topic={}", topicFullName);
+ pulsarAdmin.topics().delete(fullTopicName, true);
+ LOGGER.info("success to delete topic={}", fullTopicName);
} catch (PulsarAdminException e) {
- LOGGER.error("failed to delete topic=" + topicFullName, e);
+ LOGGER.error("failed to delete topic=" + fullTopicName, e);
throw e;
}
}
@@ -221,21 +221,17 @@ public class PulsarOperator {
/**
* Create a Pulsar subscription for the given topic
*/
- public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
- throws PulsarAdminException {
- Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
- Preconditions.checkNotNull(subscription, "subscription cannot be empty during creating subscription");
-
- String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
- LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
+ public void createSubscription(PulsarAdmin pulsarAdmin, String fullTopicName, String queueModule,
+ String subscription) throws PulsarAdminException {
+ LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, fullTopicName);
try {
- boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription,
- InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
+ boolean isExists = this.subscriptionExists(pulsarAdmin, fullTopicName, subscription,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(queueModule));
if (isExists) {
LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
return;
}
- pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
+ pulsarAdmin.topics().createSubscription(fullTopicName, subscription, MessageId.latest);
LOGGER.info("success to create subscription={}", subscription);
} catch (PulsarAdminException e) {
LOGGER.error("failed to create pulsar subscription=" + subscription, e);
@@ -246,11 +242,12 @@ public class PulsarOperator {
/**
* Create a Pulsar subscription for the specified topic list
*/
- public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
+ public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicInfo topicInfo,
List<String> topicList) throws PulsarAdminException {
for (String topic : topicList) {
- topicBean.setTopicName(topic);
- this.createSubscription(pulsarAdmin, topicBean, subscription);
+ topicInfo.setTopicName(topic);
+ String fullTopicName = topicInfo.getTenant() + "/" + topicInfo.getNamespace() + "/" + topic;
+ this.createSubscription(pulsarAdmin, fullTopicName, topicInfo.getQueueModule(), subscription);
}
LOGGER.info("success to create subscription={} for multiple topics={}", subscription, topicList);
}
@@ -266,7 +263,7 @@ public class PulsarOperator {
/**
* Check whether the Pulsar namespace exists under the specified tenant
*/
- private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
+ private boolean namespaceExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
throws PulsarAdminException {
List<String> namespaceList = pulsarAdmin.namespaces().getNamespaces(tenant);
return namespaceList.contains(tenant + "/" + namespace);
@@ -278,9 +275,9 @@ public class PulsarOperator {
* @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
* Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
*/
- public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic,
+ public boolean topicExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topicName,
boolean isPartitioned) {
- if (StringUtils.isBlank(topic)) {
+ if (StringUtils.isBlank(topicName)) {
return true;
}
@@ -295,36 +292,39 @@ public class PulsarOperator {
}
for (String t : topicList) {
t = t.substring(t.lastIndexOf("/") + 1); // not contains /
- if (topic.equals(t)) {
+ if (topicName.equals(t)) {
topicExists = true;
break;
}
}
} catch (PulsarAdminException pe) {
- LOGGER.error("check if the pulsar topic={} exists error, begin retry", topic, pe);
+ LOGGER.error("check if the pulsar topic={} exists error, begin retry", topicName, pe);
int count = 0;
try {
while (!topicExists && ++count <= RETRY_TIMES) {
- LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topic, count);
+ LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topicName, count);
Thread.sleep(DELAY_SECONDS);
topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
for (String t : topicList) {
t = t.substring(t.lastIndexOf("/") + 1);
- if (topic.equals(t)) {
+ if (topicName.equals(t)) {
topicExists = true;
break;
}
}
}
} catch (Exception e) {
- LOGGER.error("after retry, check if the pulsar topic={} exists still error", topic, pe);
+ LOGGER.error("after retry, check if the pulsar topic={} exists still error", topicName, pe);
}
}
return topicExists;
}
- private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
+ /**
+ * Check whether the Pulsar topic exists.
+ */
+ private boolean subscriptionExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
boolean isPartitioned) {
int count = 0;
while (++count <= RETRY_TIMES) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 1b179c597..f66b81955 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.service.resource.queue.pulsar;
import com.google.common.base.Objects;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.MQType;
@@ -30,12 +31,14 @@ import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicInfo;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.stream.InlongStreamService;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.springframework.beans.factory.annotation.Autowired;
@@ -50,12 +53,19 @@ import java.util.List;
@Service
public class PulsarResourceOperator implements QueueResourceOperator {
+ /**
+ * The name rule for Pulsar subscription: clusterTag_topicName_sinkId_consumer_group
+ */
+ private static final String PULSAR_SUBSCRIPTION = "%s_%s_%s_consumer_group";
+
+ @Autowired
+ private InlongClusterService clusterService;
@Autowired
private InlongStreamService streamService;
@Autowired
- private InlongClusterService clusterService;
+ private StreamSinkService sinkService;
@Autowired
- private ConsumptionService consumptionService;
+ private InlongConsumeService consumeService;
@Autowired
private PulsarOperator pulsarOperator;
@@ -77,7 +87,7 @@ public class PulsarResourceOperator implements QueueResourceOperator {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(clusterTag, null,
ClusterType.PULSAR);
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- // 1. create pulsar tenant and namespace
+ // create pulsar tenant and namespace
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
@@ -92,14 +102,16 @@ public class PulsarResourceOperator implements QueueResourceOperator {
log.info("success to create pulsar namespace for groupId={}, namespace={}", groupId, namespace);
}
- // 2. create Pulsar Topic - each Inlong Stream corresponds to a Pulsar Topic
+ // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
if (streamInfoList == null || streamInfoList.isEmpty()) {
log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
return;
}
- for (InlongStreamBriefInfo streamInfo : streamInfoList) {
- this.createPulsarTopic(groupInfo, pulsarCluster, streamInfo.getMqResource());
+ // create pulsar topic and subscription
+ for (InlongStreamBriefInfo stream : streamInfoList) {
+ this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
+ this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), stream.getInlongStreamId());
}
} catch (Exception e) {
String msg = String.format("failed to create pulsar resource for groupId=%s", groupId);
@@ -147,10 +159,11 @@ public class PulsarResourceOperator implements QueueResourceOperator {
try {
// get pulsar cluster via the inlong cluster tag from the inlong group
- String clusterTag = groupInfo.getInlongClusterTag();
- ClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.PULSAR);
- // create pulsar topic
- this.createPulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
+ groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
+ // create pulsar topic and subscription
+ this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
+ this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource(), streamId);
} catch (Exception e) {
String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
log.error(msg, e);
@@ -185,66 +198,86 @@ public class PulsarResourceOperator implements QueueResourceOperator {
/**
* Create Pulsar Topic and Subscription, and save the consumer group info.
*/
- private void createPulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName)
+ private void createTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName)
throws Exception {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- // 1. create pulsar topic
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
String namespace = pulsarInfo.getMqResource();
- PulsarTopicBean topicBean = PulsarTopicBean.builder()
+ PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
.tenant(tenant)
.namespace(namespace)
.topicName(topicName)
.queueModule(pulsarInfo.getQueueModule())
.numPartitions(pulsarInfo.getPartitionNum())
.build();
- pulsarOperator.createTopic(pulsarAdmin, topicBean);
+ pulsarOperator.createTopic(pulsarAdmin, topicInfo);
+ }
+ }
- // 2. create a subscription for the pulsar topic
- boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topicName,
- InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
+ /**
+ * Create Pulsar Subscription, and save the consumer group info.
+ */
+ private void createSubscription(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster, String topicName,
+ String streamId) throws Exception {
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ }
+ String namespace = pulsarInfo.getMqResource();
+ String fullTopicName = tenant + "/" + namespace + "/" + topicName;
+ boolean exist = pulsarOperator.topicExists(pulsarAdmin, tenant, namespace, topicName,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(pulsarInfo.getQueueModule()));
if (!exist) {
- String topicFullName = tenant + "/" + namespace + "/" + topicName;
String serviceUrl = pulsarCluster.getAdminUrl();
- log.error("topic={} not exists in {}", topicFullName, serviceUrl);
- throw new WorkflowListenerException("topic=" + topicFullName + " not exists in " + serviceUrl);
+ log.error("topic={} not exists in {}", fullTopicName, serviceUrl);
+ throw new WorkflowListenerException("topic=" + fullTopicName + " not exists in " + serviceUrl);
}
- // subscription naming rules: clusterTag_topicName_consumer_group
- String subscription = groupInfo.getInlongClusterTag() + "_" + topicName + "_consumer_group";
- pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
- String groupId = groupInfo.getInlongGroupId();
- log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}",
- groupId, topicName, subscription);
+ // create subscription for all sinks
+ String groupId = pulsarInfo.getInlongGroupId();
+ List<StreamSink> streamSinks = sinkService.listSink(groupId, streamId);
+ if (CollectionUtils.isEmpty(streamSinks)) {
+ log.warn("no need to create subs, as no sink exists for groupId={}, streamId={}", groupId, streamId);
+ return;
+ }
- // 3. insert the consumer group info into the consumption table
- consumptionService.saveSortConsumption(groupInfo, topicName, subscription);
- log.info("success to save consume for groupId={}, topic={}, subs={}", groupId, topicName, subscription);
+ // subscription naming rules: clusterTag_topicName_sinkId_consumer_group
+ String clusterTag = pulsarInfo.getInlongClusterTag();
+ for (StreamSink sink : streamSinks) {
+ String subs = String.format(PULSAR_SUBSCRIPTION, clusterTag, topicName, sink.getId());
+ pulsarOperator.createSubscription(pulsarAdmin, fullTopicName, pulsarInfo.getQueueModule(), subs);
+ log.info("success to create subs={} for groupId={}, topic={}", subs, groupId, fullTopicName);
+
+ // insert the consumer group info into the consumption table
+ Integer id = consumeService.saveBySystem(pulsarInfo, topicName, subs);
+ log.info("success to save inlong consume [{}] for subs={}, groupId={}, topic={}",
+ id, subs, groupId, topicName);
+ }
}
}
/**
* Delete Pulsar Topic and Subscription, and delete the consumer group info.
+ * TODO delete Subscription and InlongConsume info
*/
private void deletePulsarTopic(InlongGroupInfo groupInfo, PulsarClusterInfo pulsarCluster, String topicName)
throws Exception {
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- // 1. delete pulsar topic
String tenant = pulsarCluster.getTenant();
if (StringUtils.isEmpty(tenant)) {
tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
}
String namespace = groupInfo.getMqResource();
- PulsarTopicBean topicBean = PulsarTopicBean.builder()
+ PulsarTopicInfo topicInfo = PulsarTopicInfo.builder()
.tenant(tenant)
.namespace(namespace)
.topicName(topicName)
.build();
- pulsarOperator.forceDeleteTopic(pulsarAdmin, topicBean);
+ pulsarOperator.forceDeleteTopic(pulsarAdmin, topicInfo);
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
index 1d57b447f..2f05800c5 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/tubemq/TubeMQResourceOperator.java
@@ -19,16 +19,16 @@ package org.apache.inlong.manager.service.resource.queue.tubemq;
import com.google.common.base.Objects;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.consume.InlongConsumeService;
import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -43,7 +43,7 @@ public class TubeMQResourceOperator implements QueueResourceOperator {
@Autowired
private InlongClusterService clusterService;
@Autowired
- private ConsumptionService consumptionService;
+ private InlongConsumeService consumeService;
@Autowired
private TubeMQOperator tubeMQOperator;
@@ -79,9 +79,10 @@ public class TubeMQResourceOperator implements QueueResourceOperator {
tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, operator);
log.info("success to create tubemq consumer group for groupId={}", groupId);
- // insert the consumer group info into the consumption table
- consumptionService.saveSortConsumption(groupInfo, topicName, consumeGroup);
- log.info("success to save consume for groupId={}, topic={}, consumer={}", groupId, topicName, consumeGroup);
+ // insert the consumer group info
+ Integer id = consumeService.saveBySystem(groupInfo, topicName, consumeGroup);
+ log.info("success to save inlong consume [{}] for consumerGroup={}, groupId={}, topic={}",
+ id, consumeGroup, groupId, topicName);
log.info("success to create tubemq resource for groupId={}, cluster={}", groupId, tubeCluster);
} catch (Exception e) {